1 // Copyright 2023 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! Implementation of control port used for multi-port enabled virtio-console
6
7 use std::collections::VecDeque;
8 use std::sync::Arc;
9
10 use anyhow::anyhow;
11 use anyhow::Context;
12 use anyhow::Result;
13 use base::debug;
14 use base::error;
15 use cros_async::select2;
16 use cros_async::EventAsync;
17 use cros_async::Executor;
18 use data_model::Le16;
19 use data_model::Le32;
20 use futures::channel::mpsc;
21 use futures::FutureExt;
22 use futures::SinkExt;
23 use futures::StreamExt;
24 use sync::Mutex;
25 use zerocopy::AsBytes;
26 use zerocopy::FromBytes;
27 use zerocopy::FromZeroes;
28
29 use super::handle_input;
30 use crate::virtio;
31 use crate::virtio::async_device::AsyncQueueState;
32 use crate::virtio::console::ConsoleError;
33 use crate::virtio::Interrupt;
34 use crate::virtio::Queue;
35 use crate::virtio::Reader;
36
37 type ControlMsgBytes = VecDeque<u8>;
38
39 #[derive(Clone, Debug, Default, FromZeroes, FromBytes, AsBytes)]
40 #[repr(C)]
41 struct ControlMsg {
42 id: Le32,
43 event: Le16,
44 value: Le16,
45 }
46
47 impl ControlMsg {
new(id: u32, event: ControlEvent, value: u16) -> ControlMsg48 fn new(id: u32, event: ControlEvent, value: u16) -> ControlMsg {
49 ControlMsg {
50 id: Le32::from(id),
51 event: Le16::from(event as u16),
52 value: Le16::from(value),
53 }
54 }
55 }
56
57 #[derive(Debug, PartialEq, enumn::N)]
58 enum ControlEvent {
59 DeviceReady = 0,
60 DeviceAdd = 1,
61 DeviceRemove = 2,
62 PortReady = 3,
63 ConsolePort = 4,
64 Resize = 5,
65 PortOpen = 6,
66 PortName = 7,
67 }
68 impl TryFrom<u16> for ControlEvent {
69 type Error = anyhow::Error;
70
try_from(value: u16) -> Result<Self>71 fn try_from(value: u16) -> Result<Self> {
72 match ControlEvent::n(value) {
73 Some(event) => Ok(event),
74 None => Err(anyhow!("unsupported event {}", value)),
75 }
76 }
77 }
78
process_tx_ctrl_msg( reader: &mut Reader, ports: &Vec<ConsolePortInfo>, ) -> Result<Vec<ControlMsgBytes>>79 fn process_tx_ctrl_msg(
80 reader: &mut Reader,
81 ports: &Vec<ConsolePortInfo>,
82 ) -> Result<Vec<ControlMsgBytes>> {
83 let mut messages = Vec::<ControlMsgBytes>::new();
84 let ports_num = ports.len() as u32;
85 let ctrl_msg: ControlMsg = reader.read_obj().context("failed to read from reader")?;
86 let id = ctrl_msg.id.to_native();
87 let event = ControlEvent::try_from(ctrl_msg.event.to_native())?;
88 let value: u16 = ctrl_msg.value.to_native();
89
90 if id >= ports_num && event != ControlEvent::DeviceReady {
91 return Err(anyhow!("console: id {} out of range", id));
92 }
93
94 match event {
95 ControlEvent::DeviceReady => {
96 // value of 1 indicates success, and 0 indicates failure
97 if value == 1 {
98 for id in 0..ports_num {
99 let msg = ControlMsg::new(id, ControlEvent::DeviceAdd, 0);
100 let _ = msg.as_bytes();
101 messages.push(msg.as_bytes().to_owned().into());
102
103 let name = ports[id as usize].name.clone();
104 let msg = ControlMsg::new(id, ControlEvent::PortName, 0);
105 let mut reply: ControlMsgBytes = msg.as_bytes().to_owned().into();
106 reply.extend(name.as_bytes());
107 messages.push(reply);
108 }
109 } else {
110 error!("console: received event {:?} value {}", event, value);
111 }
112 }
113 ControlEvent::PortReady => {
114 // value of 1 indicates success, and 0 indicates failure
115 if value == 1 {
116 let msg = ControlMsg::new(id, ControlEvent::PortOpen, 1);
117 messages.push(msg.as_bytes().to_owned().into());
118
119 let is_console = ports[id as usize].console;
120 if is_console {
121 let msg = ControlMsg::new(id, ControlEvent::ConsolePort, 1);
122 messages.push(msg.as_bytes().to_owned().into());
123 }
124 } else {
125 error!("console: received event {:?} value {}", event, value);
126 }
127 }
128 ControlEvent::PortOpen => match value {
129 // Currently, port state change is not supported, default is open.
130 // And only print debug info here.
131 0 => debug!("console port{} close", id),
132 1 => debug!("console port{} open", id),
133 _ => error!("console port{} open {}", id, value),
134 },
135 _ => {
136 return Err(anyhow!("console: unexpected control event {:?}", event));
137 }
138 }
139
140 Ok(messages)
141 }
142
process_tx_ctrl_queue( queue: &Arc<Mutex<Queue>>, doorbell: &Interrupt, ports: &Vec<ConsolePortInfo>, ) -> Vec<ControlMsgBytes>143 fn process_tx_ctrl_queue(
144 queue: &Arc<Mutex<Queue>>,
145 doorbell: &Interrupt,
146 ports: &Vec<ConsolePortInfo>,
147 ) -> Vec<ControlMsgBytes> {
148 let mut needs_interrupt = false;
149 let mut messages = Vec::<ControlMsgBytes>::new();
150 let mut queue = queue.try_lock().expect("Lock should not be unavailable");
151
152 while let Some(mut avail_desc) = queue.pop() {
153 match process_tx_ctrl_msg(&mut avail_desc.reader, ports) {
154 Ok(mut msg) => messages.append(&mut msg),
155 Err(e) => {
156 error!("console: failed to handle control msg: {}", e);
157 }
158 }
159
160 queue.add_used(avail_desc, 0);
161 needs_interrupt = true;
162 }
163
164 if needs_interrupt {
165 queue.trigger_interrupt(doorbell);
166 }
167
168 messages
169 }
170
run_tx_ctrl_queue( queue: &Arc<Mutex<Queue>>, doorbell: Interrupt, kick_evt: EventAsync, sender: &mut mpsc::UnboundedSender<Vec<ControlMsgBytes>>, ports: Vec<ConsolePortInfo>, )171 async fn run_tx_ctrl_queue(
172 queue: &Arc<Mutex<Queue>>,
173 doorbell: Interrupt,
174 kick_evt: EventAsync,
175 sender: &mut mpsc::UnboundedSender<Vec<ControlMsgBytes>>,
176 ports: Vec<ConsolePortInfo>,
177 ) {
178 loop {
179 if let Err(e) = kick_evt.next_val().await {
180 error!("Failed to read kick event for tx queue: {}", e);
181 break;
182 }
183
184 let messages = process_tx_ctrl_queue(queue, &doorbell, &ports);
185
186 if let Err(e) = sender.send(messages).await {
187 error!("console: failed to send control msg: {}", e);
188 break;
189 }
190 }
191 }
192
run_rx_ctrl_queue( queue: &Arc<Mutex<Queue>>, doorbell: Interrupt, kick_evt: EventAsync, receiver: &mut mpsc::UnboundedReceiver<Vec<ControlMsgBytes>>, )193 async fn run_rx_ctrl_queue(
194 queue: &Arc<Mutex<Queue>>,
195 doorbell: Interrupt,
196 kick_evt: EventAsync,
197 receiver: &mut mpsc::UnboundedReceiver<Vec<ControlMsgBytes>>,
198 ) {
199 loop {
200 let messages = receiver.next().await;
201
202 if let Some(messages) = messages {
203 for mut msg in messages.into_iter() {
204 while !msg.is_empty() {
205 match handle_input(&doorbell, &mut msg, queue) {
206 Ok(()) => {}
207 Err(ConsoleError::RxDescriptorsExhausted) => {
208 // Wait until a descriptor becomes available and try again.
209 if let Err(e) = kick_evt.next_val().await {
210 error!("Failed to read kick event for rx-ctrl queue: {}", e);
211 return;
212 }
213 }
214 }
215 }
216 }
217 }
218 }
219 }
220
221 /// Each port info for multi-port virtio-console
222 #[derive(Default, Clone)]
223 pub struct ConsolePortInfo {
224 pub console: bool,
225 pub name: String,
226 }
227
228 /// Control port for multi-port virtio-console
229 pub struct ControlPort {
230 sender: AsyncQueueState<mpsc::UnboundedSender<Vec<ControlMsgBytes>>>,
231 receiver: AsyncQueueState<mpsc::UnboundedReceiver<Vec<ControlMsgBytes>>>,
232 ports: Vec<ConsolePortInfo>,
233 }
234
235 impl ControlPort {
236 /// Create a control port with the given port info
new(ports: Vec<ConsolePortInfo>) -> ControlPort237 pub fn new(ports: Vec<ConsolePortInfo>) -> ControlPort {
238 let (sender, receiver) = mpsc::unbounded::<Vec<ControlMsgBytes>>();
239
240 ControlPort {
241 sender: AsyncQueueState::Stopped(sender),
242 receiver: AsyncQueueState::Stopped(receiver),
243 ports,
244 }
245 }
246
247 /// Start the control receiveq
start_receive_queue( &mut self, ex: &Executor, queue: Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, ) -> Result<()>248 pub fn start_receive_queue(
249 &mut self,
250 ex: &Executor,
251 queue: Arc<Mutex<virtio::Queue>>,
252 doorbell: Interrupt,
253 ) -> Result<()> {
254 let kick_evt = queue
255 .lock()
256 .event()
257 .try_clone()
258 .context("Failed to clone queue event")?;
259 let kick_evt =
260 EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
261
262 let receiver = &mut self.receiver;
263 let rx_future = |mut receiver, abort| {
264 Ok(async move {
265 select2(
266 run_rx_ctrl_queue(&queue, doorbell, kick_evt, &mut receiver).boxed_local(),
267 abort,
268 )
269 .await;
270
271 receiver
272 })
273 };
274
275 receiver.start(ex, rx_future)
276 }
277
278 /// Stop the control receiveq
stop_receive_queue(&mut self) -> anyhow::Result<bool>279 pub fn stop_receive_queue(&mut self) -> anyhow::Result<bool> {
280 self.receiver
281 .stop()
282 .context("failed to stop control rx queue")
283 }
284
285 /// Start the control transmitq
start_transmit_queue( &mut self, ex: &Executor, queue: Arc<Mutex<virtio::Queue>>, doorbell: Interrupt, ) -> Result<()>286 pub fn start_transmit_queue(
287 &mut self,
288 ex: &Executor,
289 queue: Arc<Mutex<virtio::Queue>>,
290 doorbell: Interrupt,
291 ) -> Result<()> {
292 let kick_evt = queue
293 .lock()
294 .event()
295 .try_clone()
296 .context("Failed to clone queue event")?;
297 let kick_evt =
298 EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
299
300 let sender = &mut self.sender;
301 let ports = self.ports.clone();
302
303 let tx_future = |mut sender, abort| {
304 Ok(async move {
305 select2(
306 run_tx_ctrl_queue(&queue, doorbell, kick_evt, &mut sender, ports).boxed_local(),
307 abort,
308 )
309 .await;
310
311 sender
312 })
313 };
314
315 sender.start(ex, tx_future)
316 }
317
318 /// Stop the control transmitq
stop_transmit_queue(&mut self) -> anyhow::Result<bool>319 pub fn stop_transmit_queue(&mut self) -> anyhow::Result<bool> {
320 self.sender
321 .stop()
322 .context("failed to stop control tx queue")
323 }
324 }
325