1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::cell::RefCell;
6 use std::collections::HashMap;
7 use std::fmt;
8 use std::fmt::Display;
9 use std::io;
10 use std::io::Read;
11 use std::io::Write;
12 use std::os::windows::io::RawHandle;
13 use std::rc::Rc;
14 use std::result;
15 use std::sync::Arc;
16 use std::thread;
17
18 use anyhow::anyhow;
19 use anyhow::Context;
20 use base::error;
21 use base::info;
22 use base::named_pipes;
23 use base::named_pipes::BlockingMode;
24 use base::named_pipes::FramingMode;
25 use base::named_pipes::OverlappedWrapper;
26 use base::named_pipes::PipeConnection;
27 use base::warn;
28 use base::AsRawDescriptor;
29 use base::Error as SysError;
30 use base::Event;
31 use base::EventExt;
32 use base::WorkerThread;
33 use cros_async::select2;
34 use cros_async::select6;
35 use cros_async::sync::Mutex;
36 use cros_async::AsyncError;
37 use cros_async::EventAsync;
38 use cros_async::Executor;
39 use cros_async::SelectResult;
40 use data_model::Le32;
41 use data_model::Le64;
42 use futures::channel::mpsc;
43 use futures::pin_mut;
44 use futures::stream::FuturesUnordered;
45 use futures::FutureExt;
46 use futures::SinkExt;
47 use futures::StreamExt;
48 use remain::sorted;
49 use thiserror::Error as ThisError;
50 use vm_memory::GuestMemory;
51 use zerocopy::AsBytes;
52 use zerocopy::FromBytes;
53
54 use crate::virtio::async_utils;
55 use crate::virtio::copy_config;
56 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_config;
57 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_event;
58 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_hdr;
59 use crate::virtio::vsock::sys::windows::protocol::vsock_op;
60 use crate::virtio::vsock::sys::windows::protocol::TYPE_STREAM_SOCKET;
61 use crate::virtio::DescriptorError;
62 use crate::virtio::DeviceType;
63 use crate::virtio::Interrupt;
64 use crate::virtio::Queue;
65 use crate::virtio::Reader;
66 use crate::virtio::SignalableInterrupt;
67 use crate::virtio::VirtioDevice;
68 use crate::virtio::Writer;
69 use crate::Suspendable;
70
71 #[sorted]
72 #[derive(ThisError, Debug)]
73 pub enum VsockError {
74 #[error("Failed to await next descriptor chain from queue: {0}")]
75 AwaitQueue(AsyncError),
76 #[error("OverlappedWrapper error.")]
77 BadOverlappedWrapper,
78 #[error("Failed to clone descriptor: {0}")]
79 CloneDescriptor(SysError),
80 #[error("Failed to create EventAsync: {0}")]
81 CreateEventAsync(AsyncError),
82 #[error("Failed to create queue reader: {0}")]
83 CreateReader(DescriptorError),
84 #[error("Failed to create wait context: {0}")]
85 CreateWaitContext(SysError),
86 #[error("Failed to create queue writer: {0}")]
87 CreateWriter(DescriptorError),
88 #[error("Failed to read queue: {0}")]
89 ReadQueue(io::Error),
90 #[error("Failed to reset event object: {0}")]
91 ResetEventObject(SysError),
92 #[error("Failed to run executor: {0}")]
93 RunExecutor(AsyncError),
94 #[error("Failed to write to pipe, port {0}: {1}")]
95 WriteFailed(PortPair, io::Error),
96 #[error("Failed to write queue: {0}")]
97 WriteQueue(io::Error),
98 }
99 pub type Result<T> = result::Result<T, VsockError>;
100
101 // Vsock has three virt IO queues: rx, tx, and event.
102 const QUEUE_SIZE: u16 = 256;
103 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE, QUEUE_SIZE];
104 // We overload port numbers so that if message is to be received from
105 // CONNECTION_EVENT_PORT_NUM (invalid port number), we recognize that a
106 // new connection was set up.
107 const CONNECTION_EVENT_PORT_NUM: u32 = u32::MAX;
108
109 /// Number of bytes in a kilobyte. Used to simplify and clarify buffer size definitions.
110 const KB: usize = 1024;
111
112 /// Size of the buffer we read into from the host side named pipe. Note that data flows from the
113 /// host pipe -> this buffer -> rx queue.
114 /// This should be large enough to facilitate fast transmission of host data, see b/232950349.
115 const TEMP_READ_BUF_SIZE_BYTES: usize = 4 * KB;
116
117 /// In the case where the host side named pipe does not have a specified buffer size, we'll default
118 /// to telling the guest that this is the amount of extra RX space available (e.g. buf_alloc).
119 /// This should be larger than the volume of data that the guest will generally send at one time to
120 /// minimize credit update packtes (see MIN_FREE_BUFFER_PCT below).
121 const DEFAULT_BUF_ALLOC_BYTES: usize = 128 * KB;
122
123 /// Minimum free buffer threshold to notify the peer with a credit update
124 /// message. This is in case we are ingesting many messages without an
125 /// opportunity to send a message back to the peer with a buffer size update.
126 /// This value is a percentage of `buf_alloc`.
127 /// TODO(b/204246759): This value was chosen without much more thought than "it
128 /// works". It should probably be adjusted, along with DEFAULT_BUF_ALLOC, to a
129 /// value that makes empirical sense for the packet sizes that we expect to
130 /// receive.
131 /// TODO(b/239848326): Replace float with integer, in order to remove risk
132 /// of losing precision. Ie. change to `10` and perform
133 /// `FOO * MIN_FREE_BUFFER_PCT / 100`
134 const MIN_FREE_BUFFER_PCT: f64 = 0.1;
135
136 // Number of packets to buffer in the tx processing channels.
137 const CHANNEL_SIZE: usize = 256;
138
139 /// Virtio device for exposing entropy to the guest OS through virtio.
140 pub struct Vsock {
141 guest_cid: u64,
142 host_guid: Option<String>,
143 features: u64,
144 worker_thread: Option<WorkerThread<()>>,
145 }
146
147 impl Vsock {
new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock>148 pub fn new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock> {
149 Ok(Vsock {
150 guest_cid,
151 host_guid,
152 features: base_features,
153 worker_thread: None,
154 })
155 }
156
get_config(&self) -> virtio_vsock_config157 fn get_config(&self) -> virtio_vsock_config {
158 virtio_vsock_config {
159 guest_cid: Le64::from(self.guest_cid),
160 }
161 }
162 }
163
164 impl VirtioDevice for Vsock {
keep_rds(&self) -> Vec<RawHandle>165 fn keep_rds(&self) -> Vec<RawHandle> {
166 Vec::new()
167 }
168
read_config(&self, offset: u64, data: &mut [u8])169 fn read_config(&self, offset: u64, data: &mut [u8]) {
170 copy_config(data, 0, self.get_config().as_bytes(), offset);
171 }
172
device_type(&self) -> DeviceType173 fn device_type(&self) -> DeviceType {
174 DeviceType::Vsock
175 }
176
queue_max_sizes(&self) -> &[u16]177 fn queue_max_sizes(&self) -> &[u16] {
178 QUEUE_SIZES
179 }
180
features(&self) -> u64181 fn features(&self) -> u64 {
182 self.features
183 }
184
ack_features(&mut self, value: u64)185 fn ack_features(&mut self, value: u64) {
186 self.features &= value;
187 }
188
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: Vec<(Queue, Event)>, ) -> anyhow::Result<()>189 fn activate(
190 &mut self,
191 mem: GuestMemory,
192 interrupt: Interrupt,
193 mut queues: Vec<(Queue, Event)>,
194 ) -> anyhow::Result<()> {
195 if queues.len() != QUEUE_SIZES.len() {
196 return Err(anyhow!(
197 "Failed to activate vsock device. queues.len(): {} != {}",
198 queues.len(),
199 QUEUE_SIZES.len(),
200 ));
201 }
202
203 let (rx_queue, rx_queue_evt) = queues.remove(0);
204 let (tx_queue, tx_queue_evt) = queues.remove(0);
205 let (event_queue, event_queue_evt) = queues.remove(0);
206
207 let host_guid = self.host_guid.clone();
208 let guest_cid = self.guest_cid;
209 self.worker_thread = Some(WorkerThread::start(
210 "userspace_virtio_vsock",
211 move |kill_evt| {
212 let mut worker = Worker::new(mem, interrupt, host_guid, guest_cid);
213 let result = worker.run(
214 rx_queue,
215 tx_queue,
216 event_queue,
217 rx_queue_evt,
218 tx_queue_evt,
219 event_queue_evt,
220 kill_evt,
221 );
222
223 if let Err(e) = result {
224 error!("userspace vsock worker thread exited with error: {:?}", e);
225 }
226 },
227 ));
228
229 Ok(())
230 }
231 }
232
233 impl Suspendable for Vsock {}
234
235 #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
236 pub struct PortPair {
237 host: u32,
238 guest: u32,
239 }
240
241 impl Display for PortPair {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result242 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
243 write!(f, "(host port: {}, guest port: {})", self.host, self.guest)
244 }
245 }
246
247 impl PortPair {
from_tx_header(header: &virtio_vsock_hdr) -> PortPair248 fn from_tx_header(header: &virtio_vsock_hdr) -> PortPair {
249 PortPair {
250 host: header.dst_port.to_native(),
251 guest: header.src_port.to_native(),
252 }
253 }
254 }
255
256 // Note: variables herein do not have to be atomic because this struct is guarded
257 // by a Mutex.
258 struct VsockConnection {
259 // The guest port.
260 guest_port: Le32,
261
262 // The actual named (asynchronous) pipe connection.
263 pipe: PipeConnection,
264 // The overlapped struct contains an event object for the named pipe.
265 // This lets us select() on the pipes by waiting on the events.
266 // This is for Reads only.
267 overlapped: Box<OverlappedWrapper>,
268 // Read buffer for the named pipes. These are needed because reads complete
269 // asynchronously.
270 buffer: Box<[u8; TEMP_READ_BUF_SIZE_BYTES]>,
271
272 // Total free-running count of received bytes.
273 recv_cnt: usize,
274
275 // Total free-running count of received bytes that the peer has been informed of.
276 prev_recv_cnt: usize,
277
278 // Total auxiliary buffer space available to receive packets from the driver, not including
279 // the virtqueue itself. For us, this is tx buffer on the named pipe into which we drain packets
280 // for the connection. Note that if the named pipe has a grow on demand TX buffer, we use
281 // DEFAULT_BUF_ALLOC instead.
282 buf_alloc: usize,
283
284 // Peer (driver) total free-running count of received bytes.
285 peer_recv_cnt: usize,
286
287 // Peer (driver) total rx buffer allocated.
288 peer_buf_alloc: usize,
289
290 // Total free-running count of transmitted bytes.
291 tx_cnt: usize,
292
293 // State tracking for full buffer condition. Currently just used for logging. If the peer's
294 // buffer does not have space for a maximum-sized message (TEMP_READ_BUF_SIZE_BYTES), this
295 // gets set to `true`. Once there's enough space in the buffer, this gets unset.
296 is_buffer_full: bool,
297 }
298
299 struct Worker {
300 mem: GuestMemory,
301 interrupt: Interrupt,
302 host_guid: Option<String>,
303 guest_cid: u64,
304 // Map of host port to a VsockConnection.
305 connections: Mutex<HashMap<PortPair, VsockConnection>>,
306 connection_event: Event,
307 }
308
309 impl Worker {
new( mem: GuestMemory, interrupt: Interrupt, host_guid: Option<String>, guest_cid: u64, ) -> Worker310 fn new(
311 mem: GuestMemory,
312 interrupt: Interrupt,
313 host_guid: Option<String>,
314 guest_cid: u64,
315 ) -> Worker {
316 Worker {
317 mem,
318 interrupt,
319 host_guid,
320 guest_cid,
321 connections: Mutex::new(HashMap::new()),
322 connection_event: Event::new().unwrap(),
323 }
324 }
325
process_rx_queue( &self, recv_queue: Arc<Mutex<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, ) -> Result<()>326 async fn process_rx_queue(
327 &self,
328 recv_queue: Arc<Mutex<Queue>>,
329 mut rx_queue_evt: EventAsync,
330 ex: &Executor,
331 ) -> Result<()> {
332 'connections_changed: loop {
333 // Run continuously until exit evt
334
335 // TODO(b/200810561): Optimize this FuturesUnordered code.
336 // Set up the EventAsyncs to select on
337 let futures = FuturesUnordered::new();
338 // This needs to be its own scope since it holds a Mutex on `self.connections`.
339 {
340 let connections = self.connections.read_lock().await;
341 for (port, connection) in connections.iter() {
342 let h_evt = connection
343 .overlapped
344 .get_h_event_ref()
345 .ok_or_else(|| {
346 error!("Missing h_event in OverlappedWrapper.");
347 VsockError::BadOverlappedWrapper
348 })
349 .unwrap()
350 .try_clone()
351 .map_err(|e| {
352 error!("Could not clone h_event.");
353 VsockError::CloneDescriptor(e)
354 })?;
355 let evt_async = EventAsync::new(h_evt, ex).map_err(|e| {
356 error!("Could not create EventAsync.");
357 VsockError::CreateEventAsync(e)
358 })?;
359 futures.push(wait_event_and_return_port_pair(evt_async, *port));
360 }
361 }
362 let connection_evt_clone = self.connection_event.try_clone().map_err(|e| {
363 error!("Could not clone connection_event.");
364 VsockError::CloneDescriptor(e)
365 })?;
366 let connection_evt_async = EventAsync::new(connection_evt_clone, ex).map_err(|e| {
367 error!("Could not create EventAsync.");
368 VsockError::CreateEventAsync(e)
369 })?;
370 futures.push(wait_event_and_return_port_pair(
371 connection_evt_async,
372 PortPair {
373 host: CONNECTION_EVENT_PORT_NUM,
374 guest: 0,
375 },
376 ));
377
378 // Wait to service the sockets. Note that for fairness, it is critical that we service
379 // all ready sockets in a single wakeup to avoid starvation. This is why ready_chunks
380 // is used, as it returns all currently *ready* futures from the stream.
381 //
382 // The expect here only triggers if the FuturesUnordered stream is exhausted. This never
383 // happens because it has at least one item, and we only request chunks once.
384 let futures_len = futures.len();
385 for port in futures
386 .ready_chunks(futures_len)
387 .next()
388 .await
389 .expect("failed to wait on vsock sockets")
390 {
391 if port.host == CONNECTION_EVENT_PORT_NUM {
392 // New connection event. Setup futures again.
393 if let Err(e) = self.connection_event.reset() {
394 error!("vsock: port: {}: could not reset connection_event.", port);
395 return Err(VsockError::ResetEventObject(e));
396 }
397 continue 'connections_changed;
398 }
399 let mut connections = self.connections.lock().await;
400 let connection = if let Some(conn) = connections.get_mut(&port) {
401 conn
402 } else {
403 // We could have been scheduled to run the rx queue *before* the connection was
404 // closed. In that case, we do nothing. The code which closed the connection
405 // (e.g. in response to a message in the tx/rx queues) will handle notifying
406 // the guest/host as required.
407 continue 'connections_changed;
408 };
409
410 // Check if the peer has enough space in their buffer to
411 // receive the maximum amount of data that we could possibly
412 // read from the host pipe. If not, we should continue to
413 // process other sockets as each socket has an independent
414 // buffer.
415 let peer_free_buf_size =
416 connection.peer_buf_alloc - (connection.tx_cnt - connection.peer_recv_cnt);
417 if peer_free_buf_size < TEMP_READ_BUF_SIZE_BYTES {
418 if !connection.is_buffer_full {
419 warn!(
420 "vsock: port {}: Peer has insufficient free buffer space ({} bytes available)",
421 port, peer_free_buf_size
422 );
423 connection.is_buffer_full = true;
424 }
425 continue;
426 } else if connection.is_buffer_full {
427 connection.is_buffer_full = false;
428 }
429
430 let pipe_connection = &mut connection.pipe;
431 let overlapped = &mut connection.overlapped;
432 let guest_port = connection.guest_port;
433 let buffer = &mut connection.buffer;
434
435 match overlapped.get_h_event_ref() {
436 Some(h_event) => {
437 if let Err(e) = h_event.reset() {
438 error!(
439 "vsock: port: {}: Could not reset event in OverlappedWrapper.",
440 port
441 );
442 return Err(VsockError::ResetEventObject(e));
443 }
444 }
445 None => {
446 error!(
447 "vsock: port: {}: missing h_event in OverlappedWrapper.",
448 port
449 );
450 return Err(VsockError::BadOverlappedWrapper);
451 }
452 }
453
454 let data_size = match pipe_connection.get_overlapped_result(&mut *overlapped) {
455 Ok(size) => size as usize,
456 Err(e) => {
457 error!("vsock: port {}: Failed to read from pipe {}", port, e);
458 // TODO(b/237278629): Close the connection if we fail to read.
459 continue 'connections_changed;
460 }
461 };
462
463 let response_header = virtio_vsock_hdr {
464 src_cid: 2.into(), // Host CID
465 dst_cid: self.guest_cid.into(), // Guest CID
466 src_port: Le32::from(port.host),
467 dst_port: guest_port,
468 len: Le32::from(data_size as u32),
469 r#type: TYPE_STREAM_SOCKET.into(),
470 op: vsock_op::VIRTIO_VSOCK_OP_RW.into(),
471 buf_alloc: Le32::from(connection.buf_alloc as u32),
472 fwd_cnt: Le32::from(connection.recv_cnt as u32),
473 ..Default::default()
474 };
475
476 connection.prev_recv_cnt = connection.recv_cnt;
477
478 // We have to only write to the queue once, so we construct a new buffer
479 // with the concatenated header and data.
480 const HEADER_SIZE: usize = std::mem::size_of::<virtio_vsock_hdr>();
481 let data_read = &buffer[..data_size];
482 let mut header_and_data = vec![0u8; HEADER_SIZE + data_size];
483 header_and_data[..HEADER_SIZE].copy_from_slice(response_header.as_bytes());
484 header_and_data[HEADER_SIZE..].copy_from_slice(data_read);
485 self.write_bytes_to_queue(
486 &mut *recv_queue.lock().await,
487 &mut rx_queue_evt,
488 &header_and_data[..],
489 )
490 .await?;
491
492 connection.tx_cnt += data_size;
493
494 // Start reading again so we receive the message and
495 // event signal immediately.
496
497 // Unsafe because the read could happen at any time
498 // after this function is called. We ensure safety
499 // by allocating the buffer and overlapped struct
500 // on the heap.
501 unsafe {
502 match pipe_connection.read_overlapped(&mut buffer[..], &mut *overlapped) {
503 Ok(()) => {}
504 Err(e) => {
505 error!("vsock: port {}: Failed to read from pipe {}", port, e);
506 }
507 }
508 }
509 }
510 }
511 }
512
process_tx_queue( &self, mut queue: Queue, mut queue_evt: EventAsync, mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>, ) -> Result<()>513 async fn process_tx_queue(
514 &self,
515 mut queue: Queue,
516 mut queue_evt: EventAsync,
517 mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>,
518 ) -> Result<()> {
519 loop {
520 // Run continuously until exit evt
521 let (mut reader, index) = self.get_next(&mut queue, &mut queue_evt).await?;
522 while reader.available_bytes() >= std::mem::size_of::<virtio_vsock_hdr>() {
523 let header = match reader.read_obj::<virtio_vsock_hdr>() {
524 Ok(hdr) => hdr,
525 Err(e) => {
526 error!("vsock: Error while reading header: {}", e);
527 break;
528 }
529 };
530
531 let len = header.len.to_native() as usize;
532 if reader.available_bytes() < len {
533 error!("vsock: Error reading packet data");
534 break;
535 }
536
537 let mut data = vec![0_u8; len];
538 if len > 0 {
539 if let Err(e) = reader.read_exact(&mut data) {
540 error!("vosck: failed to read data from tx packet: {:?}", e);
541 }
542 }
543
544 if let Err(e) = process_packets_queue.send((header, data)).await {
545 error!(
546 "Error while sending packet to queue, dropping packet: {:?}",
547 e
548 )
549 };
550 }
551
552 queue.add_used(&self.mem, index, 0);
553 queue.trigger_interrupt(&self.mem, &self.interrupt);
554 }
555 }
556
calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize557 fn calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize {
558 match pipe.get_info(/* is_server_connection= */ false) {
559 Ok(info) => {
560 if info.outgoing_buffer_size > 0 {
561 info.outgoing_buffer_size as usize
562 } else {
563 info!(
564 "vsock: port {}: using default extra rx buffer size \
565 (named pipe does not have an explicit buffer size)",
566 port
567 );
568
569 // A zero buffer size implies that the buffer grows as
570 // needed. We set our own cap here for flow control
571 // purposes.
572 DEFAULT_BUF_ALLOC_BYTES
573 }
574 }
575 Err(e) => {
576 error!(
577 "vsock: port {}: failed to get named pipe info, using default \
578 buf size. Error: {}",
579 port, e
580 );
581 DEFAULT_BUF_ALLOC_BYTES
582 }
583 }
584 }
585
586 /// Processes a connection request and returns whether to return a response (true), or reset
587 /// (false).
handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool588 async fn handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool {
589 let port = PortPair::from_tx_header(&header);
590 info!("vsock: Received connection request for port {}", port);
591
592 if self.connections.read_lock().await.contains_key(&port) {
593 // Connection exists, nothing for us to do.
594 warn!(
595 "vsock: accepting connection request on already connected port {}",
596 port
597 );
598 return true;
599 }
600
601 if self.host_guid.is_none() {
602 error!(
603 "vsock: Cannot accept guest-initiated connections \
604 without host-guid, rejecting connection"
605 );
606 return false;
607 }
608
609 // We have a new connection to establish.
610 let mut overlapped_wrapper =
611 Box::new(OverlappedWrapper::new(/* include_event= */ true).unwrap());
612 let pipe_result = named_pipes::create_client_pipe(
613 get_pipe_name(
614 self.host_guid.as_ref().unwrap(),
615 header.dst_port.to_native(),
616 )
617 .as_str(),
618 &FramingMode::Byte,
619 &BlockingMode::Wait,
620 true, /* overlapped */
621 );
622
623 match pipe_result {
624 Ok(mut pipe_connection) => {
625 let mut buffer = Box::new([0u8; TEMP_READ_BUF_SIZE_BYTES]);
626
627 // Unsafe because the read could happen at any time
628 // after this function is called. We ensure safety
629 // by allocating the buffer and overlapped struct
630 // on the heap.
631 unsafe {
632 match pipe_connection.read_overlapped(&mut buffer[..], &mut overlapped_wrapper)
633 {
634 Ok(()) => {}
635 Err(e) => {
636 error!("vsock: port {}: Failed to read from pipe {}", port, e);
637 return false;
638 }
639 }
640 }
641
642 let buf_alloc = Self::calculate_buf_alloc_from_pipe(&pipe_connection, port);
643 let connection = VsockConnection {
644 guest_port: header.src_port,
645 pipe: pipe_connection,
646 overlapped: overlapped_wrapper,
647 peer_buf_alloc: header.buf_alloc.to_native() as usize,
648 peer_recv_cnt: header.fwd_cnt.to_native() as usize,
649 buf_alloc,
650 buffer,
651 // The connection has just been made, so we haven't received
652 // anything yet.
653 recv_cnt: 0_usize,
654 prev_recv_cnt: 0_usize,
655 tx_cnt: 0_usize,
656 is_buffer_full: false,
657 };
658 self.connections.lock().await.insert(port, connection);
659 self.connection_event.signal().unwrap_or_else(|_| {
660 panic!(
661 "Failed to signal new connection event for vsock port {}.",
662 port
663 )
664 });
665 true
666 }
667 Err(e) => {
668 info!(
669 "vsock: No waiting pipe connection on port {}, \
670 not connecting (err: {:?})",
671 port, e
672 );
673 false
674 }
675 }
676 }
677
handle_vsock_guest_data( &self, header: virtio_vsock_hdr, data: &[u8], ex: &Executor, ) -> Result<()>678 async fn handle_vsock_guest_data(
679 &self,
680 header: virtio_vsock_hdr,
681 data: &[u8],
682 ex: &Executor,
683 ) -> Result<()> {
684 let port = PortPair::from_tx_header(&header);
685 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
686 {
687 let mut connections = self.connections.lock().await;
688 if let Some(connection) = connections.get_mut(&port) {
689 // Update peer buffer/recv counters
690 connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
691 connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
692
693 let pipe = &mut connection.pipe;
694 // We have to provide a OVERLAPPED struct to write to the pipe.
695 //
696 // SAFETY: safe because data & overlapped_wrapper live until the
697 // overlapped operation completes (we wait on completion below).
698 if let Err(e) = unsafe { pipe.write_overlapped(data, &mut overlapped_wrapper) } {
699 return Err(VsockError::WriteFailed(port, e));
700 }
701 } else {
702 error!(
703 "vsock: Guest attempted to send data packet over unconnected \
704 port ({}), dropping packet",
705 port
706 );
707 return Ok(());
708 }
709 }
710 if let Some(write_completed_event) = overlapped_wrapper.get_h_event_ref() {
711 // Don't block the executor while the write completes. This time should
712 // always be negligible, but will sometimes be non-zero in cases where
713 // traffic is high on the NamedPipe, especially a duplex pipe.
714 if let Ok(cloned_event) = write_completed_event.try_clone() {
715 if let Ok(async_event) = EventAsync::new_without_reset(cloned_event, ex) {
716 let _ = async_event.next_val().await;
717 } else {
718 error!(
719 "vsock: port {}: Failed to convert write event to async",
720 port
721 );
722 }
723 } else {
724 error!(
725 "vsock: port {}: Failed to clone write completion event",
726 port
727 );
728 }
729 } else {
730 error!(
731 "vsock: port: {}: Failed to get overlapped event for write",
732 port
733 );
734 }
735
736 let mut connections = self.connections.lock().await;
737 if let Some(connection) = connections.get_mut(&port) {
738 let pipe = &mut connection.pipe;
739 match pipe.get_overlapped_result(&mut overlapped_wrapper) {
740 Ok(len) => {
741 // We've received bytes from the guest, account for them in our
742 // received bytes counter.
743 connection.recv_cnt += len as usize;
744
745 if len != data.len() as u32 {
746 return Err(VsockError::WriteFailed(
747 port,
748 std::io::Error::new(
749 std::io::ErrorKind::Other,
750 format!(
751 "failed to write correct number of bytes:
752 (expected: {}, wrote: {})",
753 data.len(),
754 len
755 ),
756 ),
757 ));
758 }
759 }
760 Err(e) => {
761 return Err(VsockError::WriteFailed(port, e));
762 }
763 }
764 } else {
765 error!(
766 "vsock: Guest attempted to send data packet over unconnected \
767 port ({}), dropping packet",
768 port
769 );
770 }
771 Ok(())
772 }
773
process_tx_packets( &self, send_queue: &Arc<Mutex<Queue>>, rx_queue_evt: Event, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, ex: &Executor, )774 async fn process_tx_packets(
775 &self,
776 send_queue: &Arc<Mutex<Queue>>,
777 rx_queue_evt: Event,
778 mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
779 ex: &Executor,
780 ) {
781 let mut packet_queues = HashMap::new();
782 let mut futures = FuturesUnordered::new();
783 // Push a pending future that will never complete into FuturesUnordered.
784 // This will keep us from spinning on spurious notifications when we
785 // don't have any open connections.
786 futures.push(std::future::pending::<PortPair>().left_future());
787 loop {
788 let (new_packet, connection) = select2(packet_recv_queue.next(), futures.next()).await;
789 match connection {
790 SelectResult::Finished(Some(port)) => {
791 packet_queues.remove(&port);
792 }
793 SelectResult::Finished(_) => {
794 // This is only triggered when FuturesUnordered completes
795 // all pending futures. Right now, this can never happen, as
796 // we have a pending future that we push that will never
797 // complete.
798 }
799 SelectResult::Pending(_) => {
800 // Nothing to do.
801 }
802 };
803 match new_packet {
804 SelectResult::Finished(Some(packet)) => {
805 let port = PortPair::from_tx_header(&packet.0);
806 let queue = packet_queues.entry(port).or_insert_with(|| {
807 let (send, recv) = mpsc::channel(CHANNEL_SIZE);
808 let event_async = EventAsync::new(
809 rx_queue_evt.try_clone().expect("Failed to clone event"),
810 ex,
811 )
812 .expect("Failed to set up the rx queue event");
813 futures.push(
814 self.process_tx_packets_for_port(
815 port,
816 recv,
817 send_queue,
818 event_async,
819 ex,
820 )
821 .right_future(),
822 );
823 send
824 });
825 // Try to send the packet. Do not block other ports if the queue is full.
826 if let Err(e) = queue.try_send(packet) {
827 error!("Error sending packet to queue, dropping packet: {:?}", e)
828 }
829 }
830 SelectResult::Finished(_) => {
831 // Triggers when the channel is closed; no more packets coming.
832 packet_recv_queue.close();
833 return;
834 }
835 SelectResult::Pending(_) => {
836 // Nothing to do.
837 }
838 }
839 }
840 }
841
process_tx_packets_for_port( &self, port: PortPair, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, send_queue: &Arc<Mutex<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, ) -> PortPair842 async fn process_tx_packets_for_port(
843 &self,
844 port: PortPair,
845 mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
846 send_queue: &Arc<Mutex<Queue>>,
847 mut rx_queue_evt: EventAsync,
848 ex: &Executor,
849 ) -> PortPair {
850 while let Some((header, data)) = packet_recv_queue.next().await {
851 if !self
852 .handle_tx_packet(header, &data, send_queue, &mut rx_queue_evt, ex)
853 .await
854 {
855 packet_recv_queue.close();
856 if let Ok(Some(_)) = packet_recv_queue.try_next() {
857 warn!("vsock: closing port {} with unprocessed packets", port);
858 } else {
859 info!("vsock: closing port {} cleanly", port)
860 }
861 break;
862 }
863 }
864 port
865 }
866
handle_tx_packet( &self, header: virtio_vsock_hdr, data: &[u8], send_queue: &Arc<Mutex<Queue>>, rx_queue_evt: &mut EventAsync, ex: &Executor, ) -> bool867 async fn handle_tx_packet(
868 &self,
869 header: virtio_vsock_hdr,
870 data: &[u8],
871 send_queue: &Arc<Mutex<Queue>>,
872 rx_queue_evt: &mut EventAsync,
873 ex: &Executor,
874 ) -> bool {
875 let mut is_open = true;
876 match header.op.to_native() {
877 vsock_op::VIRTIO_VSOCK_OP_INVALID => {
878 error!("vsock: Invalid Operation requested, dropping packet");
879 }
880 vsock_op::VIRTIO_VSOCK_OP_REQUEST => {
881 let (resp_op, buf_alloc, fwd_cnt) =
882 if self.handle_vsock_connection_request(header).await {
883 let connections = self.connections.read_lock().await;
884 let port = PortPair::from_tx_header(&header);
885
886 connections.get(&port).map_or_else(
887 || {
888 warn!("vsock: port: {} connection closed during connect", port);
889 is_open = false;
890 (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
891 },
892 |conn| {
893 (
894 vsock_op::VIRTIO_VSOCK_OP_RESPONSE,
895 conn.buf_alloc as u32,
896 conn.recv_cnt as u32,
897 )
898 },
899 )
900 } else {
901 is_open = false;
902 (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
903 };
904
905 let response_header = virtio_vsock_hdr {
906 src_cid: { header.dst_cid },
907 dst_cid: { header.src_cid },
908 src_port: { header.dst_port },
909 dst_port: { header.src_port },
910 len: 0.into(),
911 r#type: TYPE_STREAM_SOCKET.into(),
912 op: resp_op.into(),
913 buf_alloc: Le32::from(buf_alloc),
914 fwd_cnt: Le32::from(fwd_cnt),
915 ..Default::default()
916 };
917 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
918 // bytes.
919 self.write_bytes_to_queue(
920 &mut *send_queue.lock().await,
921 rx_queue_evt,
922 response_header.as_bytes(),
923 )
924 .await
925 .expect("vsock: failed to write to queue");
926 }
927 vsock_op::VIRTIO_VSOCK_OP_RESPONSE => {
928 // TODO(b/237811512): Implement this for host-initiated connections
929 }
930 vsock_op::VIRTIO_VSOCK_OP_RST => {
931 // TODO(b/237811512): Implement this for host-initiated connections
932 }
933 vsock_op::VIRTIO_VSOCK_OP_SHUTDOWN => {
934 // While the header provides flags to specify tx/rx-specific shutdown,
935 // we only support full shutdown.
936 // TODO(b/237811512): Provide an optimal way to notify host of shutdowns
937 // while still maintaining easy reconnections.
938 let port = PortPair::from_tx_header(&header);
939 let mut connections = self.connections.lock().await;
940 if connections.remove(&port).is_some() {
941 let mut response = virtio_vsock_hdr {
942 src_cid: { header.dst_cid },
943 dst_cid: { header.src_cid },
944 src_port: { header.dst_port },
945 dst_port: { header.src_port },
946 len: 0.into(),
947 r#type: TYPE_STREAM_SOCKET.into(),
948 op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
949 // There is no buffer on a closed connection
950 buf_alloc: 0.into(),
951 // There is no fwd_cnt anymore on a closed connection
952 fwd_cnt: 0.into(),
953 ..Default::default()
954 };
955 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to bytes
956 self.write_bytes_to_queue(
957 &mut *send_queue.lock().await,
958 rx_queue_evt,
959 response.as_bytes_mut(),
960 )
961 .await
962 .expect("vsock: failed to write to queue");
963 self.connection_event
964 .signal()
965 .expect("vsock: failed to write to event");
966 } else {
967 error!("vsock: Attempted to close unopened port: {}", port);
968 }
969 is_open = false;
970 }
971 vsock_op::VIRTIO_VSOCK_OP_RW => {
972 match self.handle_vsock_guest_data(header, data, ex).await {
973 Ok(()) => {
974 if self
975 .check_free_buffer_threshold(header)
976 .await
977 .unwrap_or(false)
978 {
979 // Send a credit update if we're below the minimum free
980 // buffer size. We skip this if the connection is closed,
981 // which could've happened if we were closed on the other
982 // end.
983 info!("vsock: Buffer below threshold; sending credit update.");
984 self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
985 .await;
986 }
987 }
988 Err(e) => {
989 error!("vsock: resetting connection: {}", e);
990 self.send_vsock_reset(send_queue, rx_queue_evt, header)
991 .await;
992 is_open = false;
993 }
994 }
995 }
996 // An update from our peer with their buffer state, which they are sending
997 // (probably) due to a a credit request *we* made.
998 vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE => {
999 let port = PortPair::from_tx_header(&header);
1000 let mut connections = self.connections.lock().await;
1001 if let Some(connection) = connections.get_mut(&port) {
1002 connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
1003 connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
1004 } else {
1005 error!("vsock: got credit update on unknown port {}", port);
1006 is_open = false;
1007 }
1008 }
1009 // A request from our peer to get *our* buffer state. We reply to the RX queue.
1010 vsock_op::VIRTIO_VSOCK_OP_CREDIT_REQUEST => {
1011 info!(
1012 "vsock: Got credit request from peer {}; sending credit update.",
1013 PortPair::from_tx_header(&header)
1014 );
1015 self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1016 .await;
1017 }
1018 _ => {
1019 error!("vsock: Unknown operation requested, dropping packet");
1020 }
1021 }
1022 is_open
1023 }
1024
1025 // Checks if how much free buffer our peer thinks that *we* have available
1026 // is below our threshold percentage. If the connection is closed, returns `None`.
check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool>1027 async fn check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool> {
1028 let mut connections = self.connections.lock().await;
1029 let port = PortPair::from_tx_header(&header);
1030 connections.get_mut(&port).map(|connection| {
1031 let threshold: usize = (MIN_FREE_BUFFER_PCT * connection.buf_alloc as f64) as usize;
1032 connection.buf_alloc - (connection.recv_cnt - connection.prev_recv_cnt) < threshold
1033 })
1034 }
1035
send_vsock_credit_update( &self, send_queue: &Arc<Mutex<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1036 async fn send_vsock_credit_update(
1037 &self,
1038 send_queue: &Arc<Mutex<Queue>>,
1039 rx_queue_evt: &mut EventAsync,
1040 header: virtio_vsock_hdr,
1041 ) {
1042 let mut connections = self.connections.lock().await;
1043 let port = PortPair::from_tx_header(&header);
1044
1045 if let Some(connection) = connections.get_mut(&port) {
1046 let mut response = virtio_vsock_hdr {
1047 src_cid: { header.dst_cid },
1048 dst_cid: { header.src_cid },
1049 src_port: { header.dst_port },
1050 dst_port: { header.src_port },
1051 len: 0.into(),
1052 r#type: TYPE_STREAM_SOCKET.into(),
1053 op: vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE.into(),
1054 buf_alloc: Le32::from(connection.buf_alloc as u32),
1055 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1056 ..Default::default()
1057 };
1058
1059 connection.prev_recv_cnt = connection.recv_cnt;
1060
1061 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1062 // to bytes
1063 self.write_bytes_to_queue(
1064 &mut *send_queue.lock().await,
1065 rx_queue_evt,
1066 response.as_bytes_mut(),
1067 )
1068 .await
1069 .expect("vsock: failed to write to queue");
1070 } else {
1071 error!(
1072 "vsock: error sending credit update on unknown port {}",
1073 port
1074 );
1075 }
1076 }
1077
send_vsock_reset( &self, send_queue: &Arc<Mutex<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1078 async fn send_vsock_reset(
1079 &self,
1080 send_queue: &Arc<Mutex<Queue>>,
1081 rx_queue_evt: &mut EventAsync,
1082 header: virtio_vsock_hdr,
1083 ) {
1084 let mut connections = self.connections.lock().await;
1085 let port = PortPair::from_tx_header(&header);
1086 if let Some(connection) = connections.remove(&port) {
1087 let mut response = virtio_vsock_hdr {
1088 src_cid: { header.dst_cid },
1089 dst_cid: { header.src_cid },
1090 src_port: { header.dst_port },
1091 dst_port: { header.src_port },
1092 len: 0.into(),
1093 r#type: TYPE_STREAM_SOCKET.into(),
1094 op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1095 buf_alloc: Le32::from(connection.buf_alloc as u32),
1096 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1097 ..Default::default()
1098 };
1099
1100 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1101 // to bytes
1102 self.write_bytes_to_queue(
1103 &mut *send_queue.lock().await,
1104 rx_queue_evt,
1105 response.as_bytes_mut(),
1106 )
1107 .await
1108 .expect("failed to write to queue");
1109 } else {
1110 error!("vsock: error closing unknown port {}", port);
1111 }
1112 }
1113
1114 // Get a `Writer`, or wait if there's no descriptors currently available on
1115 // the queue.
get_next_writer( &self, queue: &mut Queue, queue_evt: &mut EventAsync, ) -> Result<(Writer, u16)>1116 async fn get_next_writer(
1117 &self,
1118 queue: &mut Queue,
1119 queue_evt: &mut EventAsync,
1120 ) -> Result<(Writer, u16)> {
1121 let avail_desc = match queue.next_async(&self.mem, queue_evt).await {
1122 Ok(d) => d,
1123 Err(e) => {
1124 error!("vsock: Failed to read descriptor {}", e);
1125 return Err(VsockError::AwaitQueue(e));
1126 }
1127 };
1128 let index = avail_desc.index;
1129 Writer::new(self.mem.clone(), avail_desc)
1130 .map_err(|e| {
1131 error!("vsock: failed to create Writer: {}", e);
1132 VsockError::CreateWriter(e)
1133 })
1134 .map(|r| (r, index))
1135 }
1136
write_bytes_to_queue( &self, queue: &mut Queue, queue_evt: &mut EventAsync, bytes: &[u8], ) -> Result<()>1137 async fn write_bytes_to_queue(
1138 &self,
1139 queue: &mut Queue,
1140 queue_evt: &mut EventAsync,
1141 bytes: &[u8],
1142 ) -> Result<()> {
1143 let (mut writer, desc_index) = self.get_next_writer(queue, queue_evt).await?;
1144 let res = writer.write_all(bytes);
1145
1146 if let Err(e) = res {
1147 error!(
1148 "vsock: failed to write {} bytes to queue, err: {:?}",
1149 bytes.len(),
1150 e
1151 );
1152 return Err(VsockError::WriteQueue(e));
1153 }
1154
1155 let bytes_written = writer.bytes_written() as u32;
1156 if bytes_written > 0 {
1157 queue.add_used(&self.mem, desc_index, bytes_written);
1158 queue.trigger_interrupt(&self.mem, &self.interrupt);
1159 Ok(())
1160 } else {
1161 error!("vsock: Failed to write bytes to queue");
1162 Err(VsockError::WriteQueue(std::io::Error::new(
1163 std::io::ErrorKind::Other,
1164 "failed to write bytes to queue",
1165 )))
1166 }
1167 }
1168
process_event_queue(&self, mut queue: Queue, mut queue_evt: EventAsync) -> Result<()>1169 async fn process_event_queue(&self, mut queue: Queue, mut queue_evt: EventAsync) -> Result<()> {
1170 loop {
1171 // Log but don't act on events. They are reserved exclusively for guest migration events
1172 // resulting in CID resets, which we don't support.
1173 let (mut reader, _index) = self.get_next(&mut queue, &mut queue_evt).await?;
1174 for event in reader.iter::<virtio_vsock_event>() {
1175 if event.is_ok() {
1176 error!(
1177 "Received event with id {:?}, this should not happen, and will not be handled",
1178 event.unwrap().id.to_native()
1179 );
1180 }
1181 }
1182 }
1183 }
1184
get_next( &self, queue: &mut Queue, queue_evt: &mut EventAsync, ) -> Result<(Reader, u16)>1185 async fn get_next(
1186 &self,
1187 queue: &mut Queue,
1188 queue_evt: &mut EventAsync,
1189 ) -> Result<(Reader, u16)> {
1190 let avail_desc = match queue.next_async(&self.mem, queue_evt).await {
1191 Ok(d) => d,
1192 Err(e) => {
1193 error!("vsock: Failed to read descriptor {}", e);
1194 return Err(VsockError::AwaitQueue(e));
1195 }
1196 };
1197 let index = avail_desc.index;
1198 Reader::new(self.mem.clone(), avail_desc)
1199 .map_err(|e| {
1200 error!("vsock: failed to create Reader: {}", e);
1201 VsockError::CreateReader(e)
1202 })
1203 .map(|r| (r, index))
1204 }
1205
run( &mut self, rx_queue: Queue, tx_queue: Queue, event_queue: Queue, rx_queue_evt: Event, tx_queue_evt: Event, event_queue_evt: Event, kill_evt: Event, ) -> Result<()>1206 fn run(
1207 &mut self,
1208 rx_queue: Queue,
1209 tx_queue: Queue,
1210 event_queue: Queue,
1211 rx_queue_evt: Event,
1212 tx_queue_evt: Event,
1213 event_queue_evt: Event,
1214 kill_evt: Event,
1215 ) -> Result<()> {
1216 let ex = Executor::new().unwrap();
1217
1218 // Note that this mutex won't ever be contended because the HandleExecutor is single
1219 // threaded. We need the mutex for compile time correctness, but technically it is not
1220 // actually providing mandatory locking, at least not at the moment. If we later use a
1221 // multi-threaded executor, then this lock will be important.
1222 let rx_queue_arc = Arc::new(Mutex::new(rx_queue));
1223
1224 let rx_evt_async = EventAsync::new(
1225 rx_queue_evt
1226 .try_clone()
1227 .map_err(VsockError::CloneDescriptor)?,
1228 &ex,
1229 )
1230 .expect("Failed to set up the rx queue event");
1231 let rx_handler = self.process_rx_queue(rx_queue_arc.clone(), rx_evt_async, &ex);
1232 pin_mut!(rx_handler);
1233
1234 let (send, recv) = mpsc::channel(CHANNEL_SIZE);
1235
1236 let tx_evt_async =
1237 EventAsync::new(tx_queue_evt, &ex).expect("Failed to set up the tx queue event");
1238 let tx_handler = self.process_tx_queue(tx_queue, tx_evt_async, send);
1239 pin_mut!(tx_handler);
1240
1241 let packet_handler = self.process_tx_packets(&rx_queue_arc, rx_queue_evt, recv, &ex);
1242 pin_mut!(packet_handler);
1243
1244 let event_evt_async =
1245 EventAsync::new(event_queue_evt, &ex).expect("Failed to set up the event queue event");
1246 let event_handler = self.process_event_queue(event_queue, event_evt_async);
1247 pin_mut!(event_handler);
1248
1249 // Process any requests to resample the irq value.
1250 let resample_handler = async_utils::handle_irq_resample(&ex, self.interrupt.clone());
1251 pin_mut!(resample_handler);
1252
1253 let kill_evt = EventAsync::new(kill_evt, &ex).expect("Failed to set up the kill event");
1254 let kill_handler = kill_evt.next_val();
1255 pin_mut!(kill_handler);
1256
1257 if let Err(e) = ex.run_until(select6(
1258 rx_handler,
1259 tx_handler,
1260 packet_handler,
1261 event_handler,
1262 resample_handler,
1263 kill_handler,
1264 )) {
1265 error!("Error happened when running executor: {}", e);
1266 return Err(VsockError::RunExecutor(e));
1267 }
1268 Ok(())
1269 }
1270 }
1271
get_pipe_name(guid: &str, pipe: u32) -> String1272 fn get_pipe_name(guid: &str, pipe: u32) -> String {
1273 format!("\\\\.\\pipe\\{}\\vsock-{}", guid, pipe)
1274 }
1275
wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair1276 async fn wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair {
1277 // This doesn't reset the event since we have to call GetOverlappedResult
1278 // on the OVERLAPPED struct first before resetting it.
1279 let _ = evt.get_io_source_ref().wait_for_handle().await;
1280 pair
1281 }
1282