1 /*
2 * Copyright (c) 2024 Google Inc. All rights reserved
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining
5 * a copy of this software and associated documentation files
6 * (the "Software"), to deal in the Software without restriction,
7 * including without limitation the rights to use, copy, modify, merge,
8 * publish, distribute, sublicense, and/or sell copies of the Software,
9 * and to permit persons to whom the Software is furnished to do so,
10 * subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be
13 * included in all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 */
23
24 #![deny(unsafe_op_in_unsafe_fn)]
25 use core::ffi::c_void;
26 use core::ffi::CStr;
27 use core::ops::Deref;
28 use core::ops::DerefMut;
29 use core::ptr::eq;
30 use core::ptr::null_mut;
31 use core::time::Duration;
32
33 use alloc::borrow::ToOwned;
34 use alloc::boxed::Box;
35 use alloc::ffi::CString;
36 use alloc::sync::Arc;
37 use alloc::vec;
38 use alloc::vec::Vec;
39
40 use log::debug;
41 use log::error;
42 use log::info;
43 use log::warn;
44
45 use rust_support::handle::IPC_HANDLE_POLL_HUP;
46 use rust_support::handle::IPC_HANDLE_POLL_MSG;
47 use rust_support::handle::IPC_HANDLE_POLL_READY;
48 use rust_support::handle::IPC_HANDLE_POLL_SEND_UNBLOCKED;
49 use rust_support::ipc::iovec_kern;
50 use rust_support::ipc::ipc_get_msg;
51 use rust_support::ipc::ipc_msg_info;
52 use rust_support::ipc::ipc_msg_kern;
53 use rust_support::ipc::ipc_port_connect_async;
54 use rust_support::ipc::ipc_put_msg;
55 use rust_support::ipc::ipc_read_msg;
56 use rust_support::ipc::ipc_send_msg;
57 use rust_support::ipc::zero_uuid;
58 use rust_support::ipc::IPC_CONNECT_WAIT_FOR_PORT;
59 use rust_support::ipc::IPC_PORT_PATH_MAX;
60 use rust_support::sync::Mutex;
61 use rust_support::thread;
62 use rust_support::thread::sleep;
63 use rust_support::thread::Builder;
64 use rust_support::thread::Priority;
65 use virtio_drivers_and_devices::device::socket::SocketError;
66 use virtio_drivers_and_devices::device::socket::VirtIOSocket;
67 use virtio_drivers_and_devices::device::socket::VsockAddr;
68 use virtio_drivers_and_devices::device::socket::VsockConnectionManager;
69 use virtio_drivers_and_devices::device::socket::VsockEvent;
70 use virtio_drivers_and_devices::device::socket::VsockEventType;
71 use virtio_drivers_and_devices::device::socket::VsockManager;
72 use virtio_drivers_and_devices::transport::Transport;
73 use virtio_drivers_and_devices::Error as VirtioError;
74 use virtio_drivers_and_devices::Hal;
75 use virtio_drivers_and_devices::PAGE_SIZE;
76
77 use rust_support::handle::HandleRef;
78 use rust_support::handle_set::HandleSet;
79
80 use rust_support::Error as LkError;
81
82 use crate::err::Error;
83
84 const ACTIVE_TIMEOUT: Duration = Duration::from_secs(5);
85
86 struct TipcPortAcl {
87 name: &'static CStr,
88 enabled: bool,
89 }
90
91 // macro will generate the variable containing the ACL for all tipc ports. If the feature name is
92 // defined, the corresponding port will be enabled; the port will be disabled otherwise. The macro
93 // will generate 2 extra ports for connections that send the port name in the first package for port
94 // 0 and 1.
95 macro_rules! comm_port_feature_enable {
96 ($var_name:ident[$number_ports: literal]={$({port_name: $port_name:literal, feature_name: $feature_name:literal}),+ $(,)*}) => {
97 const $var_name: [TipcPortAcl; $number_ports + 2] = [
98 TipcPortAcl { name: c"", enabled: true }, // connections on port zero must send port name in first packet
99 TipcPortAcl { name: c"", enabled: true }, // temporary workaround to not change the port 1 to port 0
100 $(
101 #[cfg(feature = $feature_name)]
102 TipcPortAcl { name: $port_name, enabled: true },
103 #[cfg(not(feature = $feature_name))]
104 TipcPortAcl { name: $port_name, enabled: false },
105 )+
106 ];
107 }
108 }
109
110 // Mapping of vsock port numbers to tipc port names.
111 //
112 // Each tipc port name must be shorter than IPC_PORT_PATH_MAX.
113 comm_port_feature_enable! {
114 PORT_MAP[8] = {
115 {port_name: c"com.android.trusty.authmgr", feature_name: "authmgr"},
116 {port_name: c"com.android.trusty.hwcryptooperations", feature_name: "hwcrypto_hal"},
117 {port_name: c"com.android.trusty.rust.hwcryptohal.V1", feature_name: "hwcrypto_hal"},
118 {port_name: c"com.android.trusty.securestorage", feature_name: "securestorage_hal"},
119 {port_name: c"com.android.trusty.widevine.transact", feature_name: "widevine_aidl_comm"},
120 {port_name: c"com.android.trusty.storage.proxy", feature_name: "securestorage_hal"},
121 {port_name: c"com.android.trusty.gatekeeper", feature_name: "gatekeeper"},
122 {port_name: c"com.android.trusty.keymint", feature_name: "keymint"},
123 }
124 }
125
126 #[allow(dead_code)]
127 #[derive(Clone, Copy, Debug, Default, PartialEq)]
128 enum VsockConnectionState {
129 #[default]
130 Invalid = 0,
131 VsockOnly,
132 TipcOnly,
133 TipcConnecting,
134 TipcSendBlocked,
135 Active,
136 TipcClosed,
137 Closed,
138 }
139
140 #[derive(Default)]
141 struct VsockConnection {
142 peer: VsockAddr,
143 local_port: u32,
144 state: VsockConnectionState,
145 tipc_port_name: Option<CString>,
146 href: HandleRef,
147 tx_count: u64,
148 tx_since_rx: u64,
149 rx_count: u64,
150 rx_since_tx: u64,
151 rx_buffer: Box<[u8]>, // buffers data if the tipc connection blocks
152 rx_pending: usize, // how many bytes to send when tipc unblocks
153 }
154
155 impl VsockConnection {
new(peer: VsockAddr, local_port: u32) -> Self156 fn new(peer: VsockAddr, local_port: u32) -> Self {
157 // Make rx_buffer twice as large as the vsock connection rx buffer such
158 // that we can buffer pending messages if TIPC blocks.
159 //
160 // TODO: the ideal rx_buffer size depends on the connection so it might
161 // be worthwhile to dynamically re-size the buffer in response to tipc
162 // blocking or unblocking.
163 let rx_buffer_len = 2 * PAGE_SIZE;
164 Self {
165 peer,
166 local_port,
167 state: VsockConnectionState::VsockOnly,
168 tipc_port_name: None,
169 rx_buffer: vec![0u8; rx_buffer_len].into_boxed_slice(),
170 ..Default::default()
171 }
172 }
173
tipc_port_name(&self) -> &str174 fn tipc_port_name(&self) -> &str {
175 self.tipc_port_name
176 .as_ref()
177 .map(|s| s.to_str().expect("invalid port name"))
178 .unwrap_or("(no port name)")
179 }
180
print_stats(&self)181 fn print_stats(&self) {
182 info!(
183 "vsock: tx {:?} ({:>5?}) rx {:?} ({:>5?}) port: {}, remote {}, state {:?}",
184 self.tx_since_rx,
185 self.tx_count,
186 self.rx_since_tx,
187 self.rx_count,
188 self.tipc_port_name(),
189 self.peer.port,
190 self.state
191 );
192 }
193
tipc_try_send(&mut self) -> Result<(), Error>194 fn tipc_try_send(&mut self) -> Result<(), Error> {
195 debug_assert!(self.rx_pending > 0 && self.rx_pending < PAGE_SIZE);
196 debug_assert!(
197 self.state == VsockConnectionState::Active
198 || self.state == VsockConnectionState::TipcSendBlocked
199 );
200
201 let length = self.rx_pending;
202 let mut iov = iovec_kern { iov_base: self.rx_buffer.as_mut_ptr() as _, iov_len: length };
203 let mut msg = ipc_msg_kern::new(&mut iov);
204
205 // Safety:
206 // `c.href.handle` is a handle attached to a tipc channel.
207 // `msg` contains an `iov` which points to a buffer from which
208 // the kernel can read `iov_len` bytes.
209 let ret = unsafe { ipc_send_msg(self.href.handle(), &mut msg) };
210 if ret == LkError::ERR_NOT_ENOUGH_BUFFER.into() {
211 self.state = VsockConnectionState::TipcSendBlocked;
212 return Ok(());
213 } else if ret < 0 {
214 error!("failed to send {length} bytes to {}: {ret} ", self.tipc_port_name());
215 LkError::from_lk(ret)?;
216 } else if ret as usize != length {
217 // TODO: in streaming mode, this should not be an error. Instead, consume
218 // the data that was sent and try sending the rest in the next message.
219 error!("sent {ret} bytes but expected to send {length} bytes");
220 return Err(LkError::ERR_BAD_LEN.into());
221 }
222
223 self.state = VsockConnectionState::Active;
224 self.tx_since_rx = 0;
225 self.rx_pending = 0;
226
227 debug!("sent {length} bytes to {}", self.tipc_port_name());
228
229 Ok(())
230 }
231 }
232
233 /// The action to take after running the `f` closure in [`vsock_connection_lookup`].
234 #[derive(PartialEq, Eq)]
235 enum ConnectionStateAction {
236 /// No action needs to be taken, so the connection stays open.
237 None,
238
239 /// TIPC has requested that the connection be closed.
240 /// This closes the connection and waits for the peer to acknowledge before removing it.
241 Close,
242
243 /// We want to close the connection and remove it
244 /// without waiting for the peer to acknowledge it,
245 /// such as when there is an error (but also potentially other reasons).
246 Remove,
247 }
248
vsock_connection_lookup_by( connections: &mut Vec<VsockConnection>, predicate: impl Fn(&VsockConnection) -> bool, f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction, ) -> Result<(), ()>249 fn vsock_connection_lookup_by(
250 connections: &mut Vec<VsockConnection>,
251 predicate: impl Fn(&VsockConnection) -> bool,
252 f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction,
253 ) -> Result<(), ()> {
254 let index = connections.iter().position(predicate).ok_or(())?;
255 let action = f(&mut connections[index]);
256 if action == ConnectionStateAction::None {
257 return Ok(());
258 }
259
260 if vsock_connection_close(&mut connections[index], action) {
261 connections.swap_remove(index);
262 }
263
264 Ok(())
265 }
266
vsock_connection_lookup_peer( connections: &mut Vec<VsockConnection>, peer: VsockAddr, local_port: u32, f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction, ) -> Result<(), ()>267 fn vsock_connection_lookup_peer(
268 connections: &mut Vec<VsockConnection>,
269 peer: VsockAddr,
270 local_port: u32,
271 f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction,
272 ) -> Result<(), ()> {
273 vsock_connection_lookup_by(
274 connections,
275 |c: &VsockConnection| c.peer == peer && c.local_port == local_port,
276 f,
277 )
278 }
279
vsock_connection_lookup_cookie( connections: &mut Vec<VsockConnection>, cookie: *mut c_void, f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction, ) -> Result<(), ()>280 fn vsock_connection_lookup_cookie(
281 connections: &mut Vec<VsockConnection>,
282 cookie: *mut c_void,
283 f: impl FnOnce(&mut VsockConnection) -> ConnectionStateAction,
284 ) -> Result<(), ()> {
285 vsock_connection_lookup_by(
286 connections,
287 |c: &VsockConnection| eq(c.href.as_ptr().cast::<c_void>(), cookie),
288 f,
289 )
290 }
291
vsock_connection_close(c: &mut VsockConnection, action: ConnectionStateAction) -> bool292 fn vsock_connection_close(c: &mut VsockConnection, action: ConnectionStateAction) -> bool {
293 info!(
294 "remote_port {}, tipc_port_name {}, state {:?}",
295 c.peer.port,
296 c.tipc_port_name(),
297 c.state
298 );
299
300 if c.state == VsockConnectionState::VsockOnly {
301 info!("tipc vsock only connection closed");
302 c.state = VsockConnectionState::TipcClosed;
303 }
304
305 if c.state == VsockConnectionState::Active
306 || c.state == VsockConnectionState::TipcConnecting
307 || c.state == VsockConnectionState::TipcSendBlocked
308 {
309 // The handle set owns the only reference we have to the handle and
310 // handle_set_wait might have already returned a pointer to c
311 c.href.detach();
312 c.href.handle_close();
313 c.href.set_cookie(null_mut());
314 info!("tipc handle closed");
315 c.state = VsockConnectionState::TipcClosed;
316 }
317 if action == ConnectionStateAction::Remove && c.state == VsockConnectionState::TipcClosed {
318 info!("vsock closed");
319 c.state = VsockConnectionState::Closed;
320 }
321 if c.state == VsockConnectionState::Closed && c.href.cookie().is_null() {
322 info!("remove connection");
323 c.print_stats();
324 return true; // remove connection
325 }
326 false // keep connection
327 }
328
329 pub struct VsockDevice<M>
330 where
331 M: VsockManager,
332 {
333 connections: Mutex<Vec<VsockConnection>>,
334 handle_set: HandleSet,
335 connection_manager: Mutex<M>,
336 }
337
338 impl<M> VsockDevice<M>
339 where
340 M: VsockManager,
341 {
new(manager: M) -> Self342 pub(crate) fn new(manager: M) -> Self {
343 Self {
344 connections: Mutex::new(Vec::new()),
345 handle_set: HandleSet::new(),
346 connection_manager: Mutex::new(manager),
347 }
348 }
349
vsock_rx_op_request(&self, peer: VsockAddr, local: VsockAddr) -> Result<(), Error>350 fn vsock_rx_op_request(&self, peer: VsockAddr, local: VsockAddr) -> Result<(), Error> {
351 debug!("dst_port {}, src_port {}", local.port, peer.port);
352
353 // do we already have a connection?
354 let mut guard = self.connections.lock();
355 if guard
356 .deref()
357 .iter()
358 .any(|connection| connection.peer == peer && connection.local_port == local.port)
359 {
360 return Err(LkError::ERR_ALREADY_EXISTS.into());
361 };
362
363 let mut c = VsockConnection::new(peer, local.port);
364
365 // ports greater than 1 use port map to determine what tipc port to connect to
366 if [0, 1].contains(&local.port) {
367 // wait on peer to send tipc port name
368 } else if (local.port as usize) < PORT_MAP.len() {
369 if PORT_MAP[local.port as usize].enabled {
370 c.tipc_port_name = Some(PORT_MAP[local.port as usize].name.to_owned());
371 self.vsock_connect_tipc(&mut c)?;
372 } else {
373 return Err(LkError::ERR_NOT_VALID.into());
374 }
375 } else {
376 return Err(LkError::ERR_OUT_OF_RANGE.into());
377 }
378
379 guard.deref_mut().push(c);
380
381 Ok(())
382 }
383
vsock_connect_on_rx( &self, c: &mut VsockConnection, length: usize, source: VsockAddr, destination: VsockAddr, ) -> Result<(), Error>384 fn vsock_connect_on_rx(
385 &self,
386 c: &mut VsockConnection,
387 length: usize,
388 source: VsockAddr,
389 destination: VsockAddr,
390 ) -> Result<(), Error> {
391 // destination port should be zero or one, otherwise, connection should not
392 // be in VsockOnly state (not already connected/connecting to tipc).
393 assert!([0, 1].contains(&destination.port));
394
395 let mut buffer = [0; IPC_PORT_PATH_MAX as usize];
396 assert!(length < buffer.len());
397 let mut data_len = self
398 .connection_manager
399 .lock()
400 .deref_mut()
401 .recv(source, destination.port, &mut buffer)
402 .unwrap();
403 assert!(data_len == length);
404 // allow manual connect from nc in line mode
405 if buffer[data_len - 1] == b'\n' as _ {
406 data_len -= 1;
407 }
408 let port_name = &buffer[0..data_len];
409 info!("port_name is {:?}", port_name);
410
411 // should not contain any null bytes
412 c.tipc_port_name = CString::new(port_name).ok();
413 info!("tipc port name set to {}", c.tipc_port_name());
414
415 self.vsock_connect_tipc(c)
416 }
417
vsock_connect_tipc(&self, c: &mut VsockConnection) -> Result<(), Error>418 fn vsock_connect_tipc(&self, c: &mut VsockConnection) -> Result<(), Error> {
419 let port_name = c.tipc_port_name.as_ref().expect("tipc port name has been set");
420 // invariant: port_name.count_bytes() + 1 <= IPC_PORT_PATH_MAX
421 debug_assert!(port_name.count_bytes() < IPC_PORT_PATH_MAX as usize);
422
423 // Safety:
424 // - `cid`` is a valid uuid because we use a bindgen'd constant
425 // - `path` points to a null-terminated C-string. The null byte was appended by
426 // `CString::new`.
427 // - `max_path` is the length of `path` in bytes including the null terminator.
428 // It is always less than or equal to IPC_PORT_PATH_MAX.
429 // - `flags` contains a flag value accepted by the callee
430 // - `chandle_ptr` points to memory that the kernel can store a pointer into
431 // after the callee returns.
432 let ret = unsafe {
433 ipc_port_connect_async(
434 &zero_uuid,
435 port_name.as_ptr(),
436 port_name.count_bytes() + 1, /* count_bytes excludes null-byte */
437 IPC_CONNECT_WAIT_FOR_PORT,
438 &mut (*c.href.as_mut_ptr()).handle,
439 )
440 };
441 if ret != 0 {
442 warn!(
443 "failed to connect to {}, remote {}, connect err {ret}",
444 c.tipc_port_name(),
445 c.peer.port
446 )
447 }
448
449 debug!("wait for connection to {}, remote {}", c.tipc_port_name(), c.peer.port);
450
451 c.state = VsockConnectionState::TipcConnecting;
452
453 // We cannot use the address of the connection as the cookie as it may move.
454 // Use the heap address of the `handle_ref` instead as it will not get moved.
455 let cookie = c.href.as_mut_ptr() as *mut c_void;
456 c.href.set_cookie(cookie);
457 c.href.set_emask(!0);
458 c.href.set_id(c.peer.port);
459
460 self.handle_set.attach(&mut c.href).map_err(|e| {
461 c.href.handle_close();
462 Error::Lk(e)
463 })
464 }
465
vsock_tx_tipc_ready(&self, c: &mut VsockConnection)466 fn vsock_tx_tipc_ready(&self, c: &mut VsockConnection) {
467 if c.state != VsockConnectionState::TipcConnecting {
468 panic!("warning, got poll ready in unexpected state: {:?}", c.state);
469 }
470 info!("connected to {}, remote {:?}", c.tipc_port_name(), c.peer.port);
471 c.state = VsockConnectionState::Active;
472
473 let buffer = [0u8];
474 let res = self.connection_manager.lock().send(c.peer, c.local_port, &buffer);
475 if res.is_err() {
476 warn!("failed to send connected status message");
477 }
478 }
479
vsock_rx_channel( &self, c: &mut VsockConnection, length: usize, source: VsockAddr, destination: VsockAddr, ) -> Result<(), Error>480 fn vsock_rx_channel(
481 &self,
482 c: &mut VsockConnection,
483 length: usize,
484 source: VsockAddr,
485 destination: VsockAddr,
486 ) -> Result<(), Error> {
487 assert_eq!(c.state, VsockConnectionState::Active);
488
489 // multiple messages may be available when we call recv but we want to forward
490 // them on the tipc connection one by one. Pass a slice of the rx_buffer so
491 // we only drain the number of bytes that correspond to a single vsock event.
492 c.rx_pending = self
493 .connection_manager
494 .lock()
495 .deref_mut()
496 .recv(source, destination.port, &mut c.rx_buffer[..length])
497 .unwrap();
498
499 // TODO: handle large messages properly
500 assert_eq!(c.rx_pending, length);
501
502 c.rx_count += 1;
503 c.rx_since_tx += 1;
504
505 c.tipc_try_send()?;
506
507 self.connection_manager.lock().deref_mut().update_credit(c.peer, c.local_port).unwrap();
508
509 Ok(())
510 }
511
vsock_send_reset(&self, peer: VsockAddr, local_port: u32)512 fn vsock_send_reset(&self, peer: VsockAddr, local_port: u32) {
513 let _ = self.connection_manager.lock().deref_mut().force_close(peer, local_port);
514 }
515
print_stats(&self)516 fn print_stats(&self) {
517 let guard = self.connections.lock();
518 let connections = guard.deref();
519 for connection in connections {
520 connection.print_stats();
521 }
522 }
523 }
524
vsock_rx_loop<M>(device: Arc<VsockDevice<M>>) -> Result<(), Error> where M: VsockManager,525 pub(crate) fn vsock_rx_loop<M>(device: Arc<VsockDevice<M>>) -> Result<(), Error>
526 where
527 M: VsockManager,
528 {
529 let ten_ms = Duration::from_millis(10);
530 let mut pending: Vec<VsockEvent> = vec![];
531
532 debug!("starting vsock_rx_loop");
533
534 // Accept connections on port zero and each name port in the port map
535 {
536 let mut connection_manager_guard = device.connection_manager.lock();
537 let connection_manager = connection_manager_guard.deref_mut();
538
539 for port in 0..PORT_MAP.len() as u32 {
540 connection_manager.listen(port);
541 }
542 }
543
544 loop {
545 // TODO: use interrupts instead of polling
546 // TODO: handle case where poll returns SocketError::OutputBufferTooShort
547 let event = pending
548 .pop()
549 .or_else(|| device.connection_manager.lock().deref_mut().poll().expect("poll failed"));
550
551 if event.is_none() {
552 sleep(ten_ms);
553 continue;
554 }
555
556 let VsockEvent { source, destination, event_type, buffer_status } = event.unwrap();
557
558 match event_type {
559 VsockEventType::ConnectionRequest => {
560 if let Err(e) = device.vsock_rx_op_request(source, destination) {
561 error!("error during vsock connection request: {e:?}");
562 device.vsock_send_reset(source, destination.port);
563 }
564 }
565 VsockEventType::Connected => {
566 panic!("outbound connections not supported");
567 }
568 VsockEventType::Received { length } => {
569 debug!("recv destination: {destination:?}");
570
571 let connections = &mut *device.connections.lock();
572 let lp = destination.port;
573 let _ = vsock_connection_lookup_peer(connections, source, lp, |mut connection| {
574 if let Err(e) = match connection {
575 ref mut c @ VsockConnection {
576 state: VsockConnectionState::VsockOnly, ..
577 } => device.vsock_connect_on_rx(c, length, source, destination),
578 ref mut c @ VsockConnection {
579 state: VsockConnectionState::Active, ..
580 } => device.vsock_rx_channel(c, length, source, destination),
581 VsockConnection {
582 state: VsockConnectionState::TipcSendBlocked, ..
583 } => {
584 // requeue pending event.
585 pending.push(VsockEvent {
586 source,
587 destination,
588 event_type,
589 buffer_status,
590 });
591 // TODO: on one hand, we want to wait for the tipc connection to unblock
592 // on the other, we want to pick up incoming events as soon as we can...
593 // NOTE: Adding support for interrupts means we no longer have to sleep.
594 sleep(ten_ms);
595 Ok(())
596 }
597 VsockConnection { state: VsockConnectionState::TipcConnecting, .. } => {
598 warn!("got data while still waiting for tipc connection");
599 Err(LkError::ERR_BAD_STATE.into())
600 }
601 VsockConnection { state: s, .. } => {
602 error!("got data for connection in state {s:?}");
603 Err(LkError::ERR_BAD_STATE.into())
604 }
605 } {
606 error!("failed to receive data from vsock connection: {e:?}");
607 device.vsock_send_reset(connection.peer, connection.local_port);
608
609 return ConnectionStateAction::Remove;
610 }
611 ConnectionStateAction::None
612 })
613 .inspect_err(|_| {
614 warn!("got packet for unknown connection");
615 });
616 }
617 VsockEventType::Disconnected { reason } => {
618 debug!("disconnected from peer. reason: {reason:?}");
619 let connections = &mut *device.connections.lock();
620 let lp = destination.port;
621 let _ = vsock_connection_lookup_peer(connections, source, lp, |_connection| {
622 ConnectionStateAction::Remove
623 })
624 .inspect_err(|_| {
625 warn!("got disconnect ({reason:?}) for unknown connection");
626 });
627 }
628 VsockEventType::CreditUpdate => { /* nothing to do */ }
629 VsockEventType::CreditRequest => {
630 // Polling the VsockConnectionManager won't return this event type
631 panic!("don't know how to handle credit requests");
632 }
633 }
634 }
635 }
636
vsock_tx_loop<M>(device: Arc<VsockDevice<M>>) -> Result<(), Error> where M: VsockManager,637 pub(crate) fn vsock_tx_loop<M>(device: Arc<VsockDevice<M>>) -> Result<(), Error>
638 where
639 M: VsockManager,
640 {
641 let mut timeout = Duration::MAX;
642 let ten_secs = Duration::from_secs(10);
643 let mut tx_buffer = vec![0u8; PAGE_SIZE].into_boxed_slice();
644 loop {
645 let mut href = HandleRef::default();
646 let mut ret = device.handle_set.handle_set_wait(&mut href, timeout);
647 if ret == Err(LkError::ERR_NOT_FOUND) {
648 // handle_set_wait returns ERR_NOT_FOUND if the handle_set is empty
649 // but we can wait for it to become non-empty using handle_wait.
650 // Once that that returns we have to call handle_set_wait again to
651 // get the event we care about.
652 ret = device.handle_set.handle_wait(&mut href.emask(), timeout);
653 if ret != Err(LkError::ERR_TIMED_OUT) {
654 info!("handle_wait on handle set returned: {ret:?}");
655 continue;
656 }
657 // fall through to ret == ERR_TIMED_OUT case, then continue
658 }
659 if ret == Err(LkError::ERR_TIMED_OUT) {
660 info!("tx inactive for {timeout:?} ms");
661 timeout = Duration::MAX;
662 device.print_stats();
663 continue;
664 }
665 if ret.is_err() {
666 warn!("handle_set_wait failed: {}", ret.unwrap_err());
667 thread::sleep(ten_secs);
668 continue;
669 }
670
671 let cookie = href.cookie();
672 let _ = vsock_connection_lookup_cookie(&mut device.connections.lock(), cookie, |c| {
673 if href.id() != c.href.id() {
674 panic!(
675 "unexpected id {:?} != {:?} for connection {}",
676 href.id(),
677 c.href.id(),
678 c.tipc_port_name()
679 );
680 }
681
682 if href.emask() & IPC_HANDLE_POLL_READY != 0 {
683 device.vsock_tx_tipc_ready(c);
684 }
685 if href.emask() & IPC_HANDLE_POLL_MSG != 0 {
686 // Print stats if we don't send any more packets for a while
687 timeout = ACTIVE_TIMEOUT;
688 // TODO: loop and read all messages?
689 let mut msg_info = ipc_msg_info::default();
690
691 // TODO: add more idiomatic Rust interface
692 // Safety:
693 // `c.href.handle` is a valid handle to a tipc channel.
694 // `ipc_get_msg` can store a message descriptor in `msg_info`.
695 let ret = unsafe { ipc_get_msg(c.href.handle(), &mut msg_info) };
696 if ret == rust_support::Error::NO_ERROR.into() {
697 let mut iov: iovec_kern = tx_buffer.as_mut().into();
698 let mut msg = ipc_msg_kern::new(&mut iov);
699
700 // Safety:
701 // `c.href.handle` is a valid handle to a tipc channel.
702 // `msg_info` holds the results of a successful call to `ipc_get_msg`
703 // using the same handle.
704 let ret = unsafe { ipc_read_msg(c.href.handle(), msg_info.id, 0, &mut msg) };
705
706 // Safety:
707 // `ipc_put_msg` was called with the same handle and msg_info arguments.
708 unsafe { ipc_put_msg(c.href.handle(), msg_info.id) };
709 if ret >= 0 && ret as usize == msg_info.len {
710 c.tx_count += 1;
711 c.tx_since_rx += 1;
712 c.rx_since_tx = 0;
713 match device.connection_manager.lock().send(
714 c.peer,
715 c.local_port,
716 &tx_buffer[..msg_info.len],
717 ) {
718 Err(err) => {
719 if err == VirtioError::SocketDeviceError(SocketError::NotConnected)
720 {
721 debug!(
722 "failed to send {} bytes from {}. Connection closed",
723 msg_info.len,
724 c.tipc_port_name()
725 );
726 } else {
727 // TODO: close connection instead
728 panic!(
729 "failed to send {} bytes from {}: {:?}",
730 msg_info.len,
731 c.tipc_port_name(),
732 err
733 );
734 }
735 }
736 Ok(_) => {
737 debug!("sent {} bytes from {}", msg_info.len, c.tipc_port_name());
738 }
739 }
740 } else {
741 error!("ipc_read_msg failed: {ret}");
742 }
743 }
744 }
745 if href.emask() & IPC_HANDLE_POLL_SEND_UNBLOCKED != 0 {
746 assert_eq!(c.state, VsockConnectionState::TipcSendBlocked);
747 assert_ne!(c.rx_pending, 0);
748
749 debug!("tipc connection unblocked {}", c.tipc_port_name());
750
751 if let Err(e) = c.tipc_try_send() {
752 error!("failed to send pending message to {}: {e:?}", c.tipc_port_name());
753 }
754 }
755 if href.emask() & IPC_HANDLE_POLL_HUP != 0 {
756 // Print stats if we don't send any more packets for a while
757 timeout = ACTIVE_TIMEOUT;
758 info!("got hup");
759 debug!(
760 "shut down connection {}, {:?}, {:?}",
761 c.tipc_port_name(),
762 c.peer,
763 c.local_port
764 );
765 let res = device.connection_manager.lock().shutdown(c.peer, c.local_port);
766 if res.is_ok() {
767 return ConnectionStateAction::Close;
768 } else {
769 warn!(
770 "failed to send shutdown command, connection removed? {}",
771 res.unwrap_err()
772 );
773 }
774 }
775 ConnectionStateAction::None
776 })
777 .inspect_err(|_| {
778 warn!("got event for non-existent remote {}, was it closed?", href.id());
779 });
780 href.handle_decref();
781 }
782 }
783
vsock_init<T: Transport + 'static + Send, H: Hal + 'static>( driver: VirtIOSocket<H, T, 4096>, ) -> Result<(), Error>784 pub(crate) fn vsock_init<T: Transport + 'static + Send, H: Hal + 'static>(
785 driver: VirtIOSocket<H, T, 4096>,
786 ) -> Result<(), Error> {
787 let manager = VsockConnectionManager::new_with_capacity(driver, 4096);
788 let device_for_rx = Arc::new(VsockDevice::new(manager));
789 let device_for_tx = device_for_rx.clone();
790
791 // In some builds, stack overflows can occur on both threads when using 4k stacks
792 let stack_size = 8192usize;
793 Builder::new()
794 .name(c"virtio_vsock_rx")
795 .priority(Priority::HIGH)
796 .stack_size(stack_size)
797 .spawn(move || {
798 let ret = vsock_rx_loop(device_for_rx);
799 error!("vsock_rx_loop returned {:?}", ret);
800 ret.err().unwrap_or(LkError::NO_ERROR.into()).into_c()
801 })
802 .map_err(|e| LkError::from_lk(e).unwrap_err())?;
803
804 Builder::new()
805 .name(c"virtio_vsock_tx")
806 .priority(Priority::HIGH)
807 .stack_size(stack_size)
808 .spawn(move || {
809 let ret = vsock_tx_loop(device_for_tx);
810 error!("vsock_tx_loop returned {:?}", ret);
811 ret.err().unwrap_or(LkError::NO_ERROR.into()).into_c()
812 })
813 .map_err(|e| LkError::from_lk(e).unwrap_err())?;
814
815 Ok(())
816 }
817