• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Implementation of control port used for multi-port enabled virtio-console
6 
7 use std::collections::VecDeque;
8 use std::sync::Arc;
9 
10 use anyhow::anyhow;
11 use anyhow::Context;
12 use anyhow::Result;
13 use base::debug;
14 use base::error;
15 use cros_async::select2;
16 use cros_async::EventAsync;
17 use cros_async::Executor;
18 use data_model::Le16;
19 use data_model::Le32;
20 use futures::channel::mpsc;
21 use futures::FutureExt;
22 use futures::SinkExt;
23 use futures::StreamExt;
24 use sync::Mutex;
25 use zerocopy::AsBytes;
26 use zerocopy::FromBytes;
27 use zerocopy::FromZeroes;
28 
29 use super::handle_input;
30 use crate::virtio;
31 use crate::virtio::async_device::AsyncQueueState;
32 use crate::virtio::console::ConsoleError;
33 use crate::virtio::Interrupt;
34 use crate::virtio::Queue;
35 use crate::virtio::Reader;
36 
37 type ControlMsgBytes = VecDeque<u8>;
38 
39 #[derive(Clone, Debug, Default, FromZeroes, FromBytes, AsBytes)]
40 #[repr(C)]
41 struct ControlMsg {
42     id: Le32,
43     event: Le16,
44     value: Le16,
45 }
46 
47 impl ControlMsg {
new(id: u32, event: ControlEvent, value: u16) -> ControlMsg48     fn new(id: u32, event: ControlEvent, value: u16) -> ControlMsg {
49         ControlMsg {
50             id: Le32::from(id),
51             event: Le16::from(event as u16),
52             value: Le16::from(value),
53         }
54     }
55 }
56 
57 #[derive(Debug, PartialEq, enumn::N)]
58 enum ControlEvent {
59     DeviceReady = 0,
60     DeviceAdd = 1,
61     DeviceRemove = 2,
62     PortReady = 3,
63     ConsolePort = 4,
64     Resize = 5,
65     PortOpen = 6,
66     PortName = 7,
67 }
68 impl TryFrom<u16> for ControlEvent {
69     type Error = anyhow::Error;
70 
try_from(value: u16) -> Result<Self>71     fn try_from(value: u16) -> Result<Self> {
72         match ControlEvent::n(value) {
73             Some(event) => Ok(event),
74             None => Err(anyhow!("unsupported event {}", value)),
75         }
76     }
77 }
78 
process_tx_ctrl_msg( reader: &mut Reader, ports: &Vec<ConsolePortInfo>, ) -> Result<Vec<ControlMsgBytes>>79 fn process_tx_ctrl_msg(
80     reader: &mut Reader,
81     ports: &Vec<ConsolePortInfo>,
82 ) -> Result<Vec<ControlMsgBytes>> {
83     let mut messages = Vec::<ControlMsgBytes>::new();
84     let ports_num = ports.len() as u32;
85     let ctrl_msg: ControlMsg = reader.read_obj().context("failed to read from reader")?;
86     let id = ctrl_msg.id.to_native();
87     let event = ControlEvent::try_from(ctrl_msg.event.to_native())?;
88     let value: u16 = ctrl_msg.value.to_native();
89 
90     if id >= ports_num && event != ControlEvent::DeviceReady {
91         return Err(anyhow!("console: id {} out of range", id));
92     }
93 
94     match event {
95         ControlEvent::DeviceReady => {
96             // value of 1 indicates success, and 0 indicates failure
97             if value == 1 {
98                 for id in 0..ports_num {
99                     let msg = ControlMsg::new(id, ControlEvent::DeviceAdd, 0);
100                     let _ = msg.as_bytes();
101                     messages.push(msg.as_bytes().to_owned().into());
102 
103                     let name = ports[id as usize].name.clone();
104                     let msg = ControlMsg::new(id, ControlEvent::PortName, 0);
105                     let mut reply: ControlMsgBytes = msg.as_bytes().to_owned().into();
106                     reply.extend(name.as_bytes());
107                     messages.push(reply);
108                 }
109             } else {
110                 error!("console: received event {:?} value {}", event, value);
111             }
112         }
113         ControlEvent::PortReady => {
114             // value of 1 indicates success, and 0 indicates failure
115             if value == 1 {
116                 let msg = ControlMsg::new(id, ControlEvent::PortOpen, 1);
117                 messages.push(msg.as_bytes().to_owned().into());
118 
119                 let is_console = ports[id as usize].console;
120                 if is_console {
121                     let msg = ControlMsg::new(id, ControlEvent::ConsolePort, 1);
122                     messages.push(msg.as_bytes().to_owned().into());
123                 }
124             } else {
125                 error!("console: received event {:?} value {}", event, value);
126             }
127         }
128         ControlEvent::PortOpen => match value {
129             // Currently, port state change is not supported, default is open.
130             // And only print debug info here.
131             0 => debug!("console port{} close", id),
132             1 => debug!("console port{} open", id),
133             _ => error!("console port{} open {}", id, value),
134         },
135         _ => {
136             return Err(anyhow!("console: unexpected control event {:?}", event));
137         }
138     }
139 
140     Ok(messages)
141 }
142 
process_tx_ctrl_queue( queue: &Arc<Mutex<Queue>>, doorbell: &Interrupt, ports: &Vec<ConsolePortInfo>, ) -> Vec<ControlMsgBytes>143 fn process_tx_ctrl_queue(
144     queue: &Arc<Mutex<Queue>>,
145     doorbell: &Interrupt,
146     ports: &Vec<ConsolePortInfo>,
147 ) -> Vec<ControlMsgBytes> {
148     let mut needs_interrupt = false;
149     let mut messages = Vec::<ControlMsgBytes>::new();
150     let mut queue = queue.try_lock().expect("Lock should not be unavailable");
151 
152     while let Some(mut avail_desc) = queue.pop() {
153         match process_tx_ctrl_msg(&mut avail_desc.reader, ports) {
154             Ok(mut msg) => messages.append(&mut msg),
155             Err(e) => {
156                 error!("console: failed to handle control msg: {}", e);
157             }
158         }
159 
160         queue.add_used(avail_desc, 0);
161         needs_interrupt = true;
162     }
163 
164     if needs_interrupt {
165         queue.trigger_interrupt(doorbell);
166     }
167 
168     messages
169 }
170 
run_tx_ctrl_queue( queue: &Arc<Mutex<Queue>>, doorbell: Interrupt, kick_evt: EventAsync, sender: &mut mpsc::UnboundedSender<Vec<ControlMsgBytes>>, ports: Vec<ConsolePortInfo>, )171 async fn run_tx_ctrl_queue(
172     queue: &Arc<Mutex<Queue>>,
173     doorbell: Interrupt,
174     kick_evt: EventAsync,
175     sender: &mut mpsc::UnboundedSender<Vec<ControlMsgBytes>>,
176     ports: Vec<ConsolePortInfo>,
177 ) {
178     loop {
179         if let Err(e) = kick_evt.next_val().await {
180             error!("Failed to read kick event for tx queue: {}", e);
181             break;
182         }
183 
184         let messages = process_tx_ctrl_queue(queue, &doorbell, &ports);
185 
186         if let Err(e) = sender.send(messages).await {
187             error!("console: failed to send control msg: {}", e);
188             break;
189         }
190     }
191 }
192 
run_rx_ctrl_queue( queue: &Arc<Mutex<Queue>>, doorbell: Interrupt, kick_evt: EventAsync, receiver: &mut mpsc::UnboundedReceiver<Vec<ControlMsgBytes>>, )193 async fn run_rx_ctrl_queue(
194     queue: &Arc<Mutex<Queue>>,
195     doorbell: Interrupt,
196     kick_evt: EventAsync,
197     receiver: &mut mpsc::UnboundedReceiver<Vec<ControlMsgBytes>>,
198 ) {
199     loop {
200         let messages = receiver.next().await;
201 
202         if let Some(messages) = messages {
203             for mut msg in messages.into_iter() {
204                 while !msg.is_empty() {
205                     match handle_input(&doorbell, &mut msg, queue) {
206                         Ok(()) => {}
207                         Err(ConsoleError::RxDescriptorsExhausted) => {
208                             // Wait until a descriptor becomes available and try again.
209                             if let Err(e) = kick_evt.next_val().await {
210                                 error!("Failed to read kick event for rx-ctrl queue: {}", e);
211                                 return;
212                             }
213                         }
214                     }
215                 }
216             }
217         }
218     }
219 }
220 
221 /// Each port info for multi-port virtio-console
222 #[derive(Default, Clone)]
223 pub struct ConsolePortInfo {
224     pub console: bool,
225     pub name: String,
226 }
227 
228 /// Control port for multi-port virtio-console
229 pub struct ControlPort {
230     sender: AsyncQueueState<mpsc::UnboundedSender<Vec<ControlMsgBytes>>>,
231     receiver: AsyncQueueState<mpsc::UnboundedReceiver<Vec<ControlMsgBytes>>>,
232     ports: Vec<ConsolePortInfo>,
233 }
234 
235 impl ControlPort {
236     /// Create a control port with the given port info
new(ports: Vec<ConsolePortInfo>) -> ControlPort237     pub fn new(ports: Vec<ConsolePortInfo>) -> ControlPort {
238         let (sender, receiver) = mpsc::unbounded::<Vec<ControlMsgBytes>>();
239 
240         ControlPort {
241             sender: AsyncQueueState::Stopped(sender),
242             receiver: AsyncQueueState::Stopped(receiver),
243             ports,
244         }
245     }
246 
247     /// Start the control receiveq
start_receive_queue( &mut self, ex: &Executor, queue: Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, ) -> Result<()>248     pub fn start_receive_queue(
249         &mut self,
250         ex: &Executor,
251         queue: Arc<Mutex<virtio::Queue>>,
252         doorbell: Interrupt,
253     ) -> Result<()> {
254         let kick_evt = queue
255             .lock()
256             .event()
257             .try_clone()
258             .context("Failed to clone queue event")?;
259         let kick_evt =
260             EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
261 
262         let receiver = &mut self.receiver;
263         let rx_future = |mut receiver, abort| {
264             Ok(async move {
265                 select2(
266                     run_rx_ctrl_queue(&queue, doorbell, kick_evt, &mut receiver).boxed_local(),
267                     abort,
268                 )
269                 .await;
270 
271                 receiver
272             })
273         };
274 
275         receiver.start(ex, rx_future)
276     }
277 
278     /// Stop the control receiveq
stop_receive_queue(&mut self) -> anyhow::Result<bool>279     pub fn stop_receive_queue(&mut self) -> anyhow::Result<bool> {
280         self.receiver
281             .stop()
282             .context("failed to stop control rx queue")
283     }
284 
285     /// Start the control transmitq
start_transmit_queue( &mut self, ex: &Executor, queue: Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, ) -> Result<()>286     pub fn start_transmit_queue(
287         &mut self,
288         ex: &Executor,
289         queue: Arc<Mutex<virtio::Queue>>,
290         doorbell: Interrupt,
291     ) -> Result<()> {
292         let kick_evt = queue
293             .lock()
294             .event()
295             .try_clone()
296             .context("Failed to clone queue event")?;
297         let kick_evt =
298             EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
299 
300         let sender = &mut self.sender;
301         let ports = self.ports.clone();
302 
303         let tx_future = |mut sender, abort| {
304             Ok(async move {
305                 select2(
306                     run_tx_ctrl_queue(&queue, doorbell, kick_evt, &mut sender, ports).boxed_local(),
307                     abort,
308                 )
309                 .await;
310 
311                 sender
312             })
313         };
314 
315         sender.start(ex, tx_future)
316     }
317 
318     /// Stop the control transmitq
stop_transmit_queue(&mut self) -> anyhow::Result<bool>319     pub fn stop_transmit_queue(&mut self) -> anyhow::Result<bool> {
320         self.sender
321             .stop()
322             .context("failed to stop control tx queue")
323     }
324 }
325