• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Google Inc. All rights reserved
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining
5  * a copy of this software and associated documentation files
6  * (the "Software"), to deal in the Software without restriction,
7  * including without limitation the rights to use, copy, modify, merge,
8  * publish, distribute, sublicense, and/or sell copies of the Software,
9  * and to permit persons to whom the Software is furnished to do so,
10  * subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be
13  * included in all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22  */
23 
24 #![deny(unsafe_op_in_unsafe_fn)]
25 use core::ffi::c_void;
26 use core::ffi::CStr;
27 use core::ops::Deref;
28 use core::ops::DerefMut;
29 use core::ptr::eq;
30 use core::ptr::null_mut;
31 use core::time::Duration;
32 
33 use alloc::borrow::ToOwned;
34 use alloc::boxed::Box;
35 use alloc::ffi::CString;
36 use alloc::sync::Arc;
37 use alloc::vec;
38 use alloc::vec::Vec;
39 
40 use log::debug;
41 use log::error;
42 use log::info;
43 use log::warn;
44 
45 use rust_support::handle::IPC_HANDLE_POLL_HUP;
46 use rust_support::handle::IPC_HANDLE_POLL_MSG;
47 use rust_support::handle::IPC_HANDLE_POLL_READY;
48 use rust_support::handle::IPC_HANDLE_POLL_SEND_UNBLOCKED;
49 use rust_support::ipc::iovec_kern;
50 use rust_support::ipc::ipc_get_msg;
51 use rust_support::ipc::ipc_msg_info;
52 use rust_support::ipc::ipc_msg_kern;
53 use rust_support::ipc::ipc_port_connect_async;
54 use rust_support::ipc::ipc_put_msg;
55 use rust_support::ipc::ipc_read_msg;
56 use rust_support::ipc::ipc_send_msg;
57 use rust_support::ipc::zero_uuid;
58 use rust_support::ipc::IPC_CONNECT_WAIT_FOR_PORT;
59 use rust_support::ipc::IPC_PORT_PATH_MAX;
60 use rust_support::sync::Mutex;
61 use rust_support::thread;
62 use rust_support::thread::sleep;
63 use rust_support::thread::Builder;
64 use rust_support::thread::Priority;
65 use virtio_drivers_and_devices::device::socket::SocketError;
66 use virtio_drivers_and_devices::device::socket::VirtIOSocket;
67 use virtio_drivers_and_devices::device::socket::VsockAddr;
68 use virtio_drivers_and_devices::device::socket::VsockConnectionManager;
69 use virtio_drivers_and_devices::device::socket::VsockEvent;
70 use virtio_drivers_and_devices::device::socket::VsockEventType;
71 use virtio_drivers_and_devices::device::socket::VsockManager;
72 use virtio_drivers_and_devices::transport::Transport;
73 use virtio_drivers_and_devices::Error as VirtioError;
74 use virtio_drivers_and_devices::Hal;
75 use virtio_drivers_and_devices::PAGE_SIZE;
76 
77 use rust_support::handle::HandleRef;
78 use rust_support::handle_set::HandleSet;
79 
80 use rust_support::Error as LkError;
81 
82 use crate::err::Error;
83 
84 const ACTIVE_TIMEOUT: Duration = Duration::from_secs(5);
85 
86 struct TipcPortAcl {
87     name: &'static CStr,
88     enabled: bool,
89 }
90 
91 // macro will generate the variable containing the ACL for all tipc ports. If the feature name is
92 // defined, the corresponding port will be enabled; the port will be disabled otherwise. The macro
93 // will generate 2 extra ports for connections that send the port name in the first package for port
94 // 0 and 1.
95 macro_rules! comm_port_feature_enable {
96     ($var_name:ident[$number_ports: literal]={$({port_name: $port_name:literal, feature_name: $feature_name:literal}),+ $(,)*}) => {
97     const $var_name: [TipcPortAcl; $number_ports + 2] = [
98         TipcPortAcl { name: c"", enabled: true }, // connections on port zero must send port name in first packet
99         TipcPortAcl { name: c"", enabled: true }, // temporary workaround to not change the port 1 to port 0
100         $(
101             #[cfg(feature = $feature_name)]
102             TipcPortAcl { name: $port_name, enabled: true },
103             #[cfg(not(feature = $feature_name))]
104             TipcPortAcl { name: $port_name, enabled: false },
105         )+
106     ];
107     }
108 }
109 
110 // Mapping of vsock port numbers to tipc port names.
111 //
112 // Each tipc port name must be shorter than IPC_PORT_PATH_MAX.
113 comm_port_feature_enable! {
114     PORT_MAP[8] = {
115         {port_name: c"com.android.trusty.authmgr", feature_name: "authmgr"},
116         {port_name: c"com.android.trusty.hwcryptooperations", feature_name: "hwcrypto_hal"},
117         {port_name: c"com.android.trusty.rust.hwcryptohal.V1", feature_name: "hwcrypto_hal"},
118         {port_name: c"com.android.trusty.securestorage", feature_name: "securestorage_hal"},
119         {port_name: c"com.android.trusty.widevine.transact", feature_name: "widevine_aidl_comm"},
120         {port_name: c"com.android.trusty.storage.proxy", feature_name: "securestorage_hal"},
121         {port_name: c"com.android.trusty.gatekeeper", feature_name: "gatekeeper"},
122         {port_name: c"com.android.trusty.keymint", feature_name: "keymint"},
123     }
124 }
125 
126 #[allow(dead_code)]
127 #[derive(Clone, Copy, Debug, Default, PartialEq)]
128 enum VsockConnectionState {
129     #[default]
130     Invalid = 0,
131     VsockOnly,
132     TipcOnly,
133     TipcConnecting,
134     TipcSendBlocked,
135     Active,
136     TipcClosed,
137     Closed,
138 }
139 
140 #[derive(Default)]
141 struct VsockConnection {
142     peer: VsockAddr,
143     local_port: u32,
144     state: VsockConnectionState,
145     tipc_port_name: Option<CString>,
146     href: HandleRef,
147     tx_count: u64,
148     tx_since_rx: u64,
149     rx_count: u64,
150     rx_since_tx: u64,
151     rx_buffer: Box<[u8]>, // buffers data if the tipc connection blocks
152     rx_pending: usize,    // how many bytes to send when tipc unblocks
153 }
154 
155 impl VsockConnection {
new(peer: VsockAddr, local_port: u32) -> Self156     fn new(peer: VsockAddr, local_port: u32) -> Self {
157         // Make rx_buffer twice as large as the vsock connection rx buffer such
158         // that we can buffer pending messages if TIPC blocks.
159         //
160         // TODO: the ideal rx_buffer size depends on the connection so it might
161         // be worthwhile to dynamically re-size the buffer in response to tipc
162         // blocking or unblocking.
163         let rx_buffer_len = 2 * PAGE_SIZE;
164         Self {
165             peer,
166             local_port,
167             state: VsockConnectionState::VsockOnly,
168             tipc_port_name: None,
169             rx_buffer: vec![0u8; rx_buffer_len].into_boxed_slice(),
170             ..Default::default()
171         }
172     }
173 
tipc_port_name(&self) -> &str174     fn tipc_port_name(&self) -> &str {
175         self.tipc_port_name
176             .as_ref()
177             .map(|s| s.to_str().expect("invalid port name"))
178             .unwrap_or("(no port name)")
179     }
180 
print_stats(&self)181     fn print_stats(&self) {
182         info!(
183             "vsock: tx {:?} ({:>5?}) rx {:?} ({:>5?}) port: {}, remote {}, state {:?}",
184             self.tx_since_rx,
185             self.tx_count,
186             self.rx_since_tx,
187             self.rx_count,
188             self.tipc_port_name(),
189             self.peer.port,
190             self.state
191         );
192     }
193 
tipc_try_send(&mut self) -> Result<(), Error>194     fn tipc_try_send(&mut self) -> Result<(), Error> {
195         debug_assert!(self.rx_pending > 0 && self.rx_pending < PAGE_SIZE);
196         debug_assert!(
197             self.state == VsockConnectionState::Active
198                 || self.state == VsockConnectionState::TipcSendBlocked
199         );
200 
201         let length = self.rx_pending;
202         let mut iov = iovec_kern { iov_base: self.rx_buffer.as_mut_ptr() as _, iov_len: length };
203         let mut msg = ipc_msg_kern::new(&mut iov);
204 
205         // Safety:
206         // `c.href.handle` is a handle attached to a tipc channel.
207         // `msg` contains an `iov` which points to a buffer from which
208         // the kernel can read `iov_len` bytes.
209         let ret = unsafe { ipc_send_msg(self.href.handle(), &mut msg) };
210         if ret == LkError::ERR_NOT_ENOUGH_BUFFER.into() {
211             self.state = VsockConnectionState::TipcSendBlocked;
212             return Ok(());
213         } else if ret < 0 {
214             error!("failed to send {length} bytes to {}: {ret} ", self.tipc_port_name());
215             LkError::from_lk(ret)?;
216         } else if ret as usize != length {
217             // TODO: in streaming mode, this should not be an error. Instead, consume
218             // the data that was sent and try sending the rest in the next message.
219             error!("sent {ret} bytes but expected to send {length} bytes");
220             return Err(LkError::ERR_BAD_LEN.into());
221         }
222 
223         self.state = VsockConnectionState::Active;
224         self.tx_since_rx = 0;
225         self.rx_pending = 0;
226 
227         debug!("sent {length} bytes to {}", self.tipc_port_name());
228 
229         Ok(())
230     }
231 }
232 
233 /// The action to take after running the `f` closure in [`vsock_connection_lookup`].
234 #[derive(PartialEq, Eq)]
235 enum ConnectionStateAction {
236     /// No action needs to be taken, so the connection stays open.
237     None,
238 
239     /// TIPC has requested that the connection be closed.
240     /// This closes the connection and waits for the peer to acknowledge before removing it.
241     Close,
242 
243     /// We want to close the connection and remove it
244     /// without waiting for the peer to acknowledge it,
245     /// such as when there is an error (but also potentially other reasons).
246     Remove,
247 }
248 
vsock_connection_lookup_by( connections: &mut Vec<VsockConnection>, predicate: impl Fn(&VsockConnection) -> bool, f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction, ) -> Result<(), ()>249 fn vsock_connection_lookup_by(
250     connections: &mut Vec<VsockConnection>,
251     predicate: impl Fn(&VsockConnection) -> bool,
252     f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction,
253 ) -> Result<(), ()> {
254     let index = connections.iter().position(predicate).ok_or(())?;
255     let action = f(&mut connections[index]);
256     if action == ConnectionStateAction::None {
257         return Ok(());
258     }
259 
260     if vsock_connection_close(&mut connections[index], action) {
261         connections.swap_remove(index);
262     }
263 
264     Ok(())
265 }
266 
vsock_connection_lookup_peer( connections: &mut Vec<VsockConnection>, peer: VsockAddr, local_port: u32, f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction, ) -> Result<(), ()>267 fn vsock_connection_lookup_peer(
268     connections: &mut Vec<VsockConnection>,
269     peer: VsockAddr,
270     local_port: u32,
271     f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction,
272 ) -> Result<(), ()> {
273     vsock_connection_lookup_by(
274         connections,
275         |c: &VsockConnection| c.peer == peer && c.local_port == local_port,
276         f,
277     )
278 }
279 
vsock_connection_lookup_cookie( connections: &mut Vec<VsockConnection>, cookie: *mut c_void, f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction, ) -> Result<(), ()>280 fn vsock_connection_lookup_cookie(
281     connections: &mut Vec<VsockConnection>,
282     cookie: *mut c_void,
283     f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction,
284 ) -> Result<(), ()> {
285     vsock_connection_lookup_by(
286         connections,
287         |c: &VsockConnection| eq(c.href.as_ptr().cast::<c_void>(), cookie),
288         f,
289     )
290 }
291 
vsock_connection_close(c: &mut VsockConnection, action: ConnectionStateAction) -> bool292 fn vsock_connection_close(c: &mut VsockConnection, action: ConnectionStateAction) -> bool {
293     info!(
294         "remote_port {}, tipc_port_name {}, state {:?}",
295         c.peer.port,
296         c.tipc_port_name(),
297         c.state
298     );
299 
300     if c.state == VsockConnectionState::VsockOnly {
301         info!("tipc vsock only connection closed");
302         c.state = VsockConnectionState::TipcClosed;
303     }
304 
305     if c.state == VsockConnectionState::Active
306         || c.state == VsockConnectionState::TipcConnecting
307         || c.state == VsockConnectionState::TipcSendBlocked
308     {
309         // The handle set owns the only reference we have to the handle and
310         // handle_set_wait might have already returned a pointer to c
311         c.href.detach();
312         c.href.handle_close();
313         c.href.set_cookie(null_mut());
314         info!("tipc handle closed");
315         c.state = VsockConnectionState::TipcClosed;
316     }
317     if action == ConnectionStateAction::Remove && c.state == VsockConnectionState::TipcClosed {
318         info!("vsock closed");
319         c.state = VsockConnectionState::Closed;
320     }
321     if c.state == VsockConnectionState::Closed && c.href.cookie().is_null() {
322         info!("remove connection");
323         c.print_stats();
324         return true; // remove connection
325     }
326     false // keep connection
327 }
328 
329 pub struct VsockDevice<M>
330 where
331     M: VsockManager,
332 {
333     connections: Mutex<Vec<VsockConnection>>,
334     handle_set: HandleSet,
335     connection_manager: Mutex<M>,
336 }
337 
338 impl<M> VsockDevice<M>
339 where
340     M: VsockManager,
341 {
new(manager: M) -> Self342     pub(crate) fn new(manager: M) -> Self {
343         Self {
344             connections: Mutex::new(Vec::new()),
345             handle_set: HandleSet::new(),
346             connection_manager: Mutex::new(manager),
347         }
348     }
349 
vsock_rx_op_request(&self, peer: VsockAddr, local: VsockAddr) -> Result<(), Error>350     fn vsock_rx_op_request(&self, peer: VsockAddr, local: VsockAddr) -> Result<(), Error> {
351         debug!("dst_port {}, src_port {}", local.port, peer.port);
352 
353         // do we already have a connection?
354         let mut guard = self.connections.lock();
355         if guard
356             .deref()
357             .iter()
358             .any(|connection| connection.peer == peer && connection.local_port == local.port)
359         {
360             return Err(LkError::ERR_ALREADY_EXISTS.into());
361         };
362 
363         let mut c = VsockConnection::new(peer, local.port);
364 
365         // ports greater than 1 use port map to determine what tipc port to connect to
366         if [0, 1].contains(&local.port) {
367             // wait on peer to send tipc port name
368         } else if (local.port as usize) < PORT_MAP.len() {
369             if PORT_MAP[local.port as usize].enabled {
370                 c.tipc_port_name = Some(PORT_MAP[local.port as usize].name.to_owned());
371                 self.vsock_connect_tipc(&mut c)?;
372             } else {
373                 return Err(LkError::ERR_NOT_VALID.into());
374             }
375         } else {
376             return Err(LkError::ERR_OUT_OF_RANGE.into());
377         }
378 
379         guard.deref_mut().push(c);
380 
381         Ok(())
382     }
383 
vsock_connect_on_rx( &self, c: &mut VsockConnection, length: usize, source: VsockAddr, destination: VsockAddr, ) -> Result<(), Error>384     fn vsock_connect_on_rx(
385         &self,
386         c: &mut VsockConnection,
387         length: usize,
388         source: VsockAddr,
389         destination: VsockAddr,
390     ) -> Result<(), Error> {
391         // destination port should be zero or one, otherwise, connection should not
392         // be in VsockOnly state (not already connected/connecting to tipc).
393         assert!([0, 1].contains(&destination.port));
394 
395         let mut buffer = [0; IPC_PORT_PATH_MAX as usize];
396         assert!(length < buffer.len());
397         let mut data_len = self
398             .connection_manager
399             .lock()
400             .deref_mut()
401             .recv(source, destination.port, &mut buffer)
402             .unwrap();
403         assert!(data_len == length);
404         // allow manual connect from nc in line mode
405         if buffer[data_len - 1] == b'\n' as _ {
406             data_len -= 1;
407         }
408         let port_name = &buffer[0..data_len];
409         info!("port_name is {:?}", port_name);
410 
411         // should not contain any null bytes
412         c.tipc_port_name = CString::new(port_name).ok();
413         info!("tipc port name set to {}", c.tipc_port_name());
414 
415         self.vsock_connect_tipc(c)
416     }
417 
vsock_connect_tipc(&self, c: &mut VsockConnection) -> Result<(), Error>418     fn vsock_connect_tipc(&self, c: &mut VsockConnection) -> Result<(), Error> {
419         let port_name = c.tipc_port_name.as_ref().expect("tipc port name has been set");
420         // invariant: port_name.count_bytes() + 1 <= IPC_PORT_PATH_MAX
421         debug_assert!(port_name.count_bytes() < IPC_PORT_PATH_MAX as usize);
422 
423         // Safety:
424         // - `cid`` is a valid uuid because we use a bindgen'd constant
425         // - `path` points to a null-terminated C-string. The null byte was appended by
426         //   `CString::new`.
427         // - `max_path` is the length of `path` in bytes including the null terminator.
428         //   It is always less than or equal to IPC_PORT_PATH_MAX.
429         // - `flags` contains a flag value accepted by the callee
430         // - `chandle_ptr` points to memory that the kernel can store a pointer into
431         //   after the callee returns.
432         let ret = unsafe {
433             ipc_port_connect_async(
434                 &zero_uuid,
435                 port_name.as_ptr(),
436                 port_name.count_bytes() + 1, /* count_bytes excludes null-byte */
437                 IPC_CONNECT_WAIT_FOR_PORT,
438                 &mut (*c.href.as_mut_ptr()).handle,
439             )
440         };
441         if ret != 0 {
442             warn!(
443                 "failed to connect to {}, remote {}, connect err {ret}",
444                 c.tipc_port_name(),
445                 c.peer.port
446             )
447         }
448 
449         debug!("wait for connection to {}, remote {}", c.tipc_port_name(), c.peer.port);
450 
451         c.state = VsockConnectionState::TipcConnecting;
452 
453         // We cannot use the address of the connection as the cookie as it may move.
454         // Use the heap address of the `handle_ref` instead as it will not get moved.
455         let cookie = c.href.as_mut_ptr() as *mut c_void;
456         c.href.set_cookie(cookie);
457         c.href.set_emask(!0);
458         c.href.set_id(c.peer.port);
459 
460         self.handle_set.attach(&mut c.href).map_err(|e| {
461             c.href.handle_close();
462             Error::Lk(e)
463         })
464     }
465 
vsock_tx_tipc_ready(&self, c: &mut VsockConnection)466     fn vsock_tx_tipc_ready(&self, c: &mut VsockConnection) {
467         if c.state != VsockConnectionState::TipcConnecting {
468             panic!("warning, got poll ready in unexpected state: {:?}", c.state);
469         }
470         info!("connected to {}, remote {:?}", c.tipc_port_name(), c.peer.port);
471         c.state = VsockConnectionState::Active;
472 
473         let buffer = [0u8];
474         let res = self.connection_manager.lock().send(c.peer, c.local_port, &buffer);
475         if res.is_err() {
476             warn!("failed to send connected status message");
477         }
478     }
479 
vsock_rx_channel( &self, c: &mut VsockConnection, length: usize, source: VsockAddr, destination: VsockAddr, ) -> Result<(), Error>480     fn vsock_rx_channel(
481         &self,
482         c: &mut VsockConnection,
483         length: usize,
484         source: VsockAddr,
485         destination: VsockAddr,
486     ) -> Result<(), Error> {
487         assert_eq!(c.state, VsockConnectionState::Active);
488 
489         // multiple messages may be available when we call recv but we want to forward
490         // them on the tipc connection one by one. Pass a slice of the rx_buffer so
491         // we only drain the number of bytes that correspond to a single vsock event.
492         c.rx_pending = self
493             .connection_manager
494             .lock()
495             .deref_mut()
496             .recv(source, destination.port, &mut c.rx_buffer[..length])
497             .unwrap();
498 
499         // TODO: handle large messages properly
500         assert_eq!(c.rx_pending, length);
501 
502         c.rx_count += 1;
503         c.rx_since_tx += 1;
504 
505         c.tipc_try_send()?;
506 
507         self.connection_manager.lock().deref_mut().update_credit(c.peer, c.local_port).unwrap();
508 
509         Ok(())
510     }
511 
vsock_send_reset(&self, peer: VsockAddr, local_port: u32)512     fn vsock_send_reset(&self, peer: VsockAddr, local_port: u32) {
513         let _ = self.connection_manager.lock().deref_mut().force_close(peer, local_port);
514     }
515 
print_stats(&self)516     fn print_stats(&self) {
517         let guard = self.connections.lock();
518         let connections = guard.deref();
519         for connection in connections {
520             connection.print_stats();
521         }
522     }
523 }
524 
vsock_rx_loop<M>(device: Arc<VsockDevice<M>>) -> Result<(), Error> where M: VsockManager,525 pub(crate) fn vsock_rx_loop<M>(device: Arc<VsockDevice<M>>) -> Result<(), Error>
526 where
527     M: VsockManager,
528 {
529     let ten_ms = Duration::from_millis(10);
530     let mut pending: Vec<VsockEvent> = vec![];
531 
532     debug!("starting vsock_rx_loop");
533 
534     // Accept connections on port zero and each name port in the port map
535     {
536         let mut connection_manager_guard = device.connection_manager.lock();
537         let connection_manager = connection_manager_guard.deref_mut();
538 
539         for port in 0..PORT_MAP.len() as u32 {
540             connection_manager.listen(port);
541         }
542     }
543 
544     loop {
545         // TODO: use interrupts instead of polling
546         // TODO: handle case where poll returns SocketError::OutputBufferTooShort
547         let event = pending
548             .pop()
549             .or_else(|| device.connection_manager.lock().deref_mut().poll().expect("poll failed"));
550 
551         if event.is_none() {
552             sleep(ten_ms);
553             continue;
554         }
555 
556         let VsockEvent { source, destination, event_type, buffer_status } = event.unwrap();
557 
558         match event_type {
559             VsockEventType::ConnectionRequest => {
560                 if let Err(e) = device.vsock_rx_op_request(source, destination) {
561                     error!("error during vsock connection request: {e:?}");
562                     device.vsock_send_reset(source, destination.port);
563                 }
564             }
565             VsockEventType::Connected => {
566                 panic!("outbound connections not supported");
567             }
568             VsockEventType::Received { length } => {
569                 debug!("recv destination: {destination:?}");
570 
571                 let connections = &mut *device.connections.lock();
572                 let lp = destination.port;
573                 let _ = vsock_connection_lookup_peer(connections, source, lp, |mut connection| {
574                     if let Err(e) = match connection {
575                         ref mut c @ VsockConnection {
576                             state: VsockConnectionState::VsockOnly, ..
577                         } => device.vsock_connect_on_rx(c, length, source, destination),
578                         ref mut c @ VsockConnection {
579                             state: VsockConnectionState::Active, ..
580                         } => device.vsock_rx_channel(c, length, source, destination),
581                         VsockConnection {
582                             state: VsockConnectionState::TipcSendBlocked, ..
583                         } => {
584                             // requeue pending event.
585                             pending.push(VsockEvent {
586                                 source,
587                                 destination,
588                                 event_type,
589                                 buffer_status,
590                             });
591                             // TODO: on one hand, we want to wait for the tipc connection to unblock
592                             // on the other, we want to pick up incoming events as soon as we can...
593                             // NOTE: Adding support for interrupts means we no longer have to sleep.
594                             sleep(ten_ms);
595                             Ok(())
596                         }
597                         VsockConnection { state: VsockConnectionState::TipcConnecting, .. } => {
598                             warn!("got data while still waiting for tipc connection");
599                             Err(LkError::ERR_BAD_STATE.into())
600                         }
601                         VsockConnection { state: s, .. } => {
602                             error!("got data for connection in state {s:?}");
603                             Err(LkError::ERR_BAD_STATE.into())
604                         }
605                     } {
606                         error!("failed to receive data from vsock connection:  {e:?}");
607                         device.vsock_send_reset(connection.peer, connection.local_port);
608 
609                         return ConnectionStateAction::Remove;
610                     }
611                     ConnectionStateAction::None
612                 })
613                 .inspect_err(|_| {
614                     warn!("got packet for unknown connection");
615                 });
616             }
617             VsockEventType::Disconnected { reason } => {
618                 debug!("disconnected from peer. reason: {reason:?}");
619                 let connections = &mut *device.connections.lock();
620                 let lp = destination.port;
621                 let _ = vsock_connection_lookup_peer(connections, source, lp, |_connection| {
622                     ConnectionStateAction::Remove
623                 })
624                 .inspect_err(|_| {
625                     warn!("got disconnect ({reason:?}) for unknown connection");
626                 });
627             }
628             VsockEventType::CreditUpdate => { /* nothing to do */ }
629             VsockEventType::CreditRequest => {
630                 // Polling the VsockConnectionManager won't return this event type
631                 panic!("don't know how to handle credit requests");
632             }
633         }
634     }
635 }
636 
vsock_tx_loop<M>(device: Arc<VsockDevice<M>>) -> Result<(), Error> where M: VsockManager,637 pub(crate) fn vsock_tx_loop<M>(device: Arc<VsockDevice<M>>) -> Result<(), Error>
638 where
639     M: VsockManager,
640 {
641     let mut timeout = Duration::MAX;
642     let ten_secs = Duration::from_secs(10);
643     let mut tx_buffer = vec![0u8; PAGE_SIZE].into_boxed_slice();
644     loop {
645         let mut href = HandleRef::default();
646         let mut ret = device.handle_set.handle_set_wait(&mut href, timeout);
647         if ret == Err(LkError::ERR_NOT_FOUND) {
648             // handle_set_wait returns ERR_NOT_FOUND if the handle_set is empty
649             // but we can wait for it to become non-empty using handle_wait.
650             // Once that that returns we have to call handle_set_wait again to
651             // get the event we care about.
652             ret = device.handle_set.handle_wait(&mut href.emask(), timeout);
653             if ret != Err(LkError::ERR_TIMED_OUT) {
654                 info!("handle_wait on handle set returned: {ret:?}");
655                 continue;
656             }
657             // fall through to ret == ERR_TIMED_OUT case, then continue
658         }
659         if ret == Err(LkError::ERR_TIMED_OUT) {
660             info!("tx inactive for {timeout:?} ms");
661             timeout = Duration::MAX;
662             device.print_stats();
663             continue;
664         }
665         if ret.is_err() {
666             warn!("handle_set_wait failed: {}", ret.unwrap_err());
667             thread::sleep(ten_secs);
668             continue;
669         }
670 
671         let cookie = href.cookie();
672         let _ = vsock_connection_lookup_cookie(&mut device.connections.lock(), cookie, |c| {
673             if href.id() != c.href.id() {
674                 panic!(
675                     "unexpected id {:?} != {:?} for connection {}",
676                     href.id(),
677                     c.href.id(),
678                     c.tipc_port_name()
679                 );
680             }
681 
682             if href.emask() & IPC_HANDLE_POLL_READY != 0 {
683                 device.vsock_tx_tipc_ready(c);
684             }
685             if href.emask() & IPC_HANDLE_POLL_MSG != 0 {
686                 // Print stats if we don't send any more packets for a while
687                 timeout = ACTIVE_TIMEOUT;
688                 // TODO: loop and read all messages?
689                 let mut msg_info = ipc_msg_info::default();
690 
691                 // TODO: add more idiomatic Rust interface
692                 // Safety:
693                 // `c.href.handle` is a valid handle to a tipc channel.
694                 // `ipc_get_msg` can store a message descriptor in `msg_info`.
695                 let ret = unsafe { ipc_get_msg(c.href.handle(), &mut msg_info) };
696                 if ret == rust_support::Error::NO_ERROR.into() {
697                     let mut iov: iovec_kern = tx_buffer.as_mut().into();
698                     let mut msg = ipc_msg_kern::new(&mut iov);
699 
700                     // Safety:
701                     // `c.href.handle` is a valid handle to a tipc channel.
702                     // `msg_info` holds the results of a successful call to `ipc_get_msg`
703                     // using the same handle.
704                     let ret = unsafe { ipc_read_msg(c.href.handle(), msg_info.id, 0, &mut msg) };
705 
706                     // Safety:
707                     // `ipc_put_msg` was called with the same handle and msg_info arguments.
708                     unsafe { ipc_put_msg(c.href.handle(), msg_info.id) };
709                     if ret >= 0 && ret as usize == msg_info.len {
710                         c.tx_count += 1;
711                         c.tx_since_rx += 1;
712                         c.rx_since_tx = 0;
713                         match device.connection_manager.lock().send(
714                             c.peer,
715                             c.local_port,
716                             &tx_buffer[..msg_info.len],
717                         ) {
718                             Err(err) => {
719                                 if err == VirtioError::SocketDeviceError(SocketError::NotConnected)
720                                 {
721                                     debug!(
722                                         "failed to send {} bytes from {}. Connection closed",
723                                         msg_info.len,
724                                         c.tipc_port_name()
725                                     );
726                                 } else {
727                                     // TODO: close connection instead
728                                     panic!(
729                                         "failed to send {} bytes from {}: {:?}",
730                                         msg_info.len,
731                                         c.tipc_port_name(),
732                                         err
733                                     );
734                                 }
735                             }
736                             Ok(_) => {
737                                 debug!("sent {} bytes from {}", msg_info.len, c.tipc_port_name());
738                             }
739                         }
740                     } else {
741                         error!("ipc_read_msg failed: {ret}");
742                     }
743                 }
744             }
745             if href.emask() & IPC_HANDLE_POLL_SEND_UNBLOCKED != 0 {
746                 assert_eq!(c.state, VsockConnectionState::TipcSendBlocked);
747                 assert_ne!(c.rx_pending, 0);
748 
749                 debug!("tipc connection unblocked {}", c.tipc_port_name());
750 
751                 if let Err(e) = c.tipc_try_send() {
752                     error!("failed to send pending message to {}: {e:?}", c.tipc_port_name());
753                 }
754             }
755             if href.emask() & IPC_HANDLE_POLL_HUP != 0 {
756                 // Print stats if we don't send any more packets for a while
757                 timeout = ACTIVE_TIMEOUT;
758                 info!("got hup");
759                 debug!(
760                     "shut down connection {}, {:?}, {:?}",
761                     c.tipc_port_name(),
762                     c.peer,
763                     c.local_port
764                 );
765                 let res = device.connection_manager.lock().shutdown(c.peer, c.local_port);
766                 if res.is_ok() {
767                     return ConnectionStateAction::Close;
768                 } else {
769                     warn!(
770                         "failed to send shutdown command, connection removed? {}",
771                         res.unwrap_err()
772                     );
773                 }
774             }
775             ConnectionStateAction::None
776         })
777         .inspect_err(|_| {
778             warn!("got event for non-existent remote {}, was it closed?", href.id());
779         });
780         href.handle_decref();
781     }
782 }
783 
vsock_init<T: Transport + 'static + Send, H: Hal + 'static>( driver: VirtIOSocket<H, T, 4096>, ) -> Result<(), Error>784 pub(crate) fn vsock_init<T: Transport + 'static + Send, H: Hal + 'static>(
785     driver: VirtIOSocket<H, T, 4096>,
786 ) -> Result<(), Error> {
787     let manager = VsockConnectionManager::new_with_capacity(driver, 4096);
788     let device_for_rx = Arc::new(VsockDevice::new(manager));
789     let device_for_tx = device_for_rx.clone();
790 
791     // In some builds, stack overflows can occur on both threads when using 4k stacks
792     let stack_size = 8192usize;
793     Builder::new()
794         .name(c"virtio_vsock_rx")
795         .priority(Priority::HIGH)
796         .stack_size(stack_size)
797         .spawn(move || {
798             let ret = vsock_rx_loop(device_for_rx);
799             error!("vsock_rx_loop returned {:?}", ret);
800             ret.err().unwrap_or(LkError::NO_ERROR.into()).into_c()
801         })
802         .map_err(|e| LkError::from_lk(e).unwrap_err())?;
803 
804     Builder::new()
805         .name(c"virtio_vsock_tx")
806         .priority(Priority::HIGH)
807         .stack_size(stack_size)
808         .spawn(move || {
809             let ret = vsock_tx_loop(device_for_tx);
810             error!("vsock_tx_loop returned {:?}", ret);
811             ret.err().unwrap_or(LkError::NO_ERROR.into()).into_c()
812         })
813         .map_err(|e| LkError::from_lk(e).unwrap_err())?;
814 
815     Ok(())
816 }
817