• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Virtio console device worker thread.
6 
7 use std::collections::BTreeMap;
8 use std::collections::VecDeque;
9 use std::sync::mpsc;
10 use std::sync::Arc;
11 
12 use anyhow::anyhow;
13 use anyhow::Context;
14 use base::error;
15 use base::Event;
16 use base::EventToken;
17 use base::WaitContext;
18 use base::WorkerThread;
19 use sync::Mutex;
20 
21 use crate::virtio::console::control::process_control_receive_queue;
22 use crate::virtio::console::control::process_control_transmit_queue;
23 use crate::virtio::console::control::ControlMsgBytes;
24 use crate::virtio::console::input::process_receive_queue;
25 use crate::virtio::console::output::process_transmit_queue;
26 use crate::virtio::console::port::ConsolePort;
27 use crate::virtio::console::port::ConsolePortInfo;
28 use crate::virtio::Queue;
29 
30 const PORT0_RECEIVEQ_IDX: usize = 0;
31 const PORT0_TRANSMITQ_IDX: usize = 1;
32 const CONTROL_RECEIVEQ_IDX: usize = 2;
33 const CONTROL_TRANSMITQ_IDX: usize = 3;
34 const PORT1_RECEIVEQ_IDX: usize = 4;
35 const PORT1_TRANSMITQ_IDX: usize = 5;
36 
37 pub struct WorkerPort {
38     info: Option<ConsolePortInfo>,
39 
40     in_avail_evt: Event,
41     input_buffer: Arc<Mutex<VecDeque<u8>>>,
42     output: Box<dyn std::io::Write + Send>,
43 }
44 
45 impl WorkerPort {
from_console_port(port: &mut ConsolePort) -> WorkerPort46     pub fn from_console_port(port: &mut ConsolePort) -> WorkerPort {
47         let in_avail_evt = port.clone_in_avail_evt().unwrap();
48         let input_buffer = port.clone_input_buffer();
49         let output = port
50             .take_output()
51             .unwrap_or_else(|| Box::new(std::io::sink()));
52         let info = port.port_info().cloned();
53         WorkerPort {
54             info,
55             in_avail_evt,
56             input_buffer,
57             output,
58         }
59     }
60 
61     /// Restore the state retrieved from `ConsolePort` by `WorkerPort::from_console_port()`.
into_console_port(self, console_port: &mut ConsolePort)62     pub fn into_console_port(self, console_port: &mut ConsolePort) {
63         console_port.restore_output(self.output);
64     }
65 
is_console(&self) -> bool66     pub fn is_console(&self) -> bool {
67         self.info
68             .as_ref()
69             .map(|info| info.console)
70             .unwrap_or_default()
71     }
72 
name(&self) -> Option<&str>73     pub fn name(&self) -> Option<&str> {
74         self.info.as_ref().and_then(ConsolePortInfo::name)
75     }
76 }
77 
78 #[derive(EventToken)]
79 enum Token {
80     ReceiveQueueAvailable(u32),
81     TransmitQueueAvailable(u32),
82     InputAvailable(u32),
83     ControlReceiveQueueAvailable,
84     ControlTransmitQueueAvailable,
85     WorkerRequest,
86     Kill,
87 }
88 
89 pub enum WorkerRequest {
90     StartQueue {
91         idx: usize,
92         queue: Queue,
93         response_sender: mpsc::SyncSender<anyhow::Result<()>>,
94     },
95     StopQueue {
96         idx: usize,
97         response_sender: mpsc::SyncSender<Option<Queue>>,
98     },
99 }
100 
101 pub struct Worker {
102     wait_ctx: WaitContext<Token>,
103 
104     // Currently running queues.
105     queues: BTreeMap<usize, Queue>,
106 
107     // Console ports indexed by port ID. At least port 0 will exist, and other ports may be
108     // available if `VIRTIO_CONSOLE_F_MULTIPORT` is enabled.
109     ports: Vec<WorkerPort>,
110 
111     // Device-to-driver messages to be received by the driver via the control receiveq.
112     pending_receive_control_msgs: VecDeque<ControlMsgBytes>,
113 
114     worker_receiver: mpsc::Receiver<WorkerRequest>,
115     worker_event: Event,
116 }
117 
118 impl Worker {
new( ports: Vec<WorkerPort>, worker_receiver: mpsc::Receiver<WorkerRequest>, worker_event: Event, ) -> anyhow::Result<Self>119     pub fn new(
120         ports: Vec<WorkerPort>,
121         worker_receiver: mpsc::Receiver<WorkerRequest>,
122         worker_event: Event,
123     ) -> anyhow::Result<Self> {
124         let wait_ctx = WaitContext::new().context("WaitContext::new() failed")?;
125 
126         wait_ctx.add(&worker_event, Token::WorkerRequest)?;
127 
128         for (index, port) in ports.iter().enumerate() {
129             let port_id = index as u32;
130             wait_ctx.add(&port.in_avail_evt, Token::InputAvailable(port_id))?;
131         }
132 
133         Ok(Worker {
134             wait_ctx,
135             queues: BTreeMap::new(),
136             ports,
137             pending_receive_control_msgs: VecDeque::new(),
138             worker_receiver,
139             worker_event,
140         })
141     }
142 
run(&mut self, kill_evt: &Event) -> anyhow::Result<()>143     pub fn run(&mut self, kill_evt: &Event) -> anyhow::Result<()> {
144         self.wait_ctx.add(kill_evt, Token::Kill)?;
145         let res = self.run_loop();
146         self.wait_ctx.delete(kill_evt)?;
147         res
148     }
149 
run_loop(&mut self) -> anyhow::Result<()>150     fn run_loop(&mut self) -> anyhow::Result<()> {
151         let mut running = true;
152         while running {
153             let events = self.wait_ctx.wait()?;
154 
155             for event in events.iter().filter(|e| e.is_readable) {
156                 match event.token {
157                     Token::TransmitQueueAvailable(port_id) => {
158                         if let (Some(port), Some(transmitq)) = (
159                             self.ports.get_mut(port_id as usize),
160                             transmitq_idx(port_id).and_then(|idx| self.queues.get_mut(&idx)),
161                         ) {
162                             transmitq
163                                 .event()
164                                 .wait()
165                                 .context("failed reading transmit queue Event")?;
166                             process_transmit_queue(transmitq, &mut port.output);
167                         }
168                     }
169                     Token::ReceiveQueueAvailable(port_id) | Token::InputAvailable(port_id) => {
170                         let port = self.ports.get_mut(port_id as usize);
171                         let receiveq =
172                             receiveq_idx(port_id).and_then(|idx| self.queues.get_mut(&idx));
173 
174                         let event = if matches!(event.token, Token::ReceiveQueueAvailable(..)) {
175                             receiveq.as_ref().map(|q| q.event())
176                         } else {
177                             port.as_ref().map(|p| &p.in_avail_evt)
178                         };
179                         if let Some(event) = event {
180                             event.wait().context("failed to clear receive event")?;
181                         }
182 
183                         if let (Some(port), Some(receiveq)) = (port, receiveq) {
184                             let mut input_buffer = port.input_buffer.lock();
185                             process_receive_queue(&mut input_buffer, receiveq);
186                         }
187                     }
188                     Token::ControlReceiveQueueAvailable => {
189                         if let Some(ctrl_receiveq) = self.queues.get_mut(&CONTROL_RECEIVEQ_IDX) {
190                             ctrl_receiveq
191                                 .event()
192                                 .wait()
193                                 .context("failed waiting on control event")?;
194                             process_control_receive_queue(
195                                 ctrl_receiveq,
196                                 &mut self.pending_receive_control_msgs,
197                             );
198                         }
199                     }
200                     Token::ControlTransmitQueueAvailable => {
201                         if let Some(ctrl_transmitq) = self.queues.get_mut(&CONTROL_TRANSMITQ_IDX) {
202                             ctrl_transmitq
203                                 .event()
204                                 .wait()
205                                 .context("failed waiting on control event")?;
206                             process_control_transmit_queue(
207                                 ctrl_transmitq,
208                                 &self.ports,
209                                 &mut self.pending_receive_control_msgs,
210                             );
211                         }
212 
213                         // Attempt to send any new replies if there is space in the receiveq.
214                         if let Some(ctrl_receiveq) = self.queues.get_mut(&CONTROL_RECEIVEQ_IDX) {
215                             process_control_receive_queue(
216                                 ctrl_receiveq,
217                                 &mut self.pending_receive_control_msgs,
218                             )
219                         }
220                     }
221                     Token::WorkerRequest => {
222                         self.worker_event.wait()?;
223                         self.process_worker_requests();
224                     }
225                     Token::Kill => running = false,
226                 }
227             }
228         }
229         Ok(())
230     }
231 
process_worker_requests(&mut self)232     fn process_worker_requests(&mut self) {
233         while let Ok(request) = self.worker_receiver.try_recv() {
234             match request {
235                 WorkerRequest::StartQueue {
236                     idx,
237                     queue,
238                     response_sender,
239                 } => {
240                     let res = self.start_queue(idx, queue);
241                     let _ = response_sender.send(res);
242                 }
243                 WorkerRequest::StopQueue {
244                     idx,
245                     response_sender,
246                 } => {
247                     let res = self.stop_queue(idx);
248                     let _ = response_sender.send(res);
249                 }
250             }
251         }
252     }
253 
start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()>254     fn start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()> {
255         if let Some(port_id) = receiveq_port_id(idx) {
256             self.wait_ctx
257                 .add(queue.event(), Token::ReceiveQueueAvailable(port_id))?;
258         } else if let Some(port_id) = transmitq_port_id(idx) {
259             self.wait_ctx
260                 .add(queue.event(), Token::TransmitQueueAvailable(port_id))?;
261         } else if idx == CONTROL_RECEIVEQ_IDX {
262             self.wait_ctx
263                 .add(queue.event(), Token::ControlReceiveQueueAvailable)?;
264         } else if idx == CONTROL_TRANSMITQ_IDX {
265             self.wait_ctx
266                 .add(queue.event(), Token::ControlTransmitQueueAvailable)?;
267         } else {
268             return Err(anyhow!("unhandled queue idx {idx}"));
269         }
270 
271         let prev = self.queues.insert(idx, queue);
272         assert!(prev.is_none());
273         Ok(())
274     }
275 
stop_queue(&mut self, idx: usize) -> Option<Queue>276     fn stop_queue(&mut self, idx: usize) -> Option<Queue> {
277         if let Some(queue) = self.queues.remove(&idx) {
278             let _ = self.wait_ctx.delete(queue.event());
279             Some(queue)
280         } else {
281             None
282         }
283     }
284 }
285 
286 pub struct WorkerHandle {
287     worker_thread: WorkerThread<Vec<WorkerPort>>,
288     worker_sender: mpsc::Sender<WorkerRequest>,
289     worker_event: Event,
290 }
291 
292 impl WorkerHandle {
new(ports: Vec<WorkerPort>) -> anyhow::Result<Self>293     pub fn new(ports: Vec<WorkerPort>) -> anyhow::Result<Self> {
294         let worker_event = Event::new().context("Event::new")?;
295         let worker_event_clone = worker_event.try_clone().context("Event::try_clone")?;
296         let (worker_sender, worker_receiver) = mpsc::channel();
297         let worker_thread = WorkerThread::start("v_console", move |kill_evt| {
298             let mut worker = Worker::new(ports, worker_receiver, worker_event_clone)
299                 .expect("console Worker::new() failed");
300             if let Err(e) = worker.run(&kill_evt) {
301                 error!("console worker failed: {:#}", e);
302             }
303             worker.ports
304         });
305         Ok(WorkerHandle {
306             worker_thread,
307             worker_sender,
308             worker_event,
309         })
310     }
311 
start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()>312     pub fn start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()> {
313         let (response_sender, response_receiver) = mpsc::sync_channel(0);
314         self.worker_sender
315             .send(WorkerRequest::StartQueue {
316                 idx,
317                 queue,
318                 response_sender,
319             })
320             .context("mpsc::Sender::send")?;
321         self.worker_event.signal().context("Event::signal")?;
322         response_receiver.recv().context("mpsc::Receiver::recv")?
323     }
324 
stop_queue(&mut self, idx: usize) -> anyhow::Result<Option<Queue>>325     pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Option<Queue>> {
326         let (response_sender, response_receiver) = mpsc::sync_channel(0);
327         self.worker_sender
328             .send(WorkerRequest::StopQueue {
329                 idx,
330                 response_sender,
331             })
332             .context("mpsc::Sender::send")?;
333         self.worker_event.signal().context("Event::signal")?;
334         response_receiver.recv().context("mpsc::Receiver::recv")
335     }
336 
stop(self) -> Vec<WorkerPort>337     pub fn stop(self) -> Vec<WorkerPort> {
338         self.worker_thread.stop()
339     }
340 }
341 
receiveq_idx(port_id: u32) -> Option<usize>342 fn receiveq_idx(port_id: u32) -> Option<usize> {
343     if port_id == 0 {
344         Some(PORT0_RECEIVEQ_IDX)
345     } else {
346         PORT1_RECEIVEQ_IDX.checked_add((port_id - 1).checked_mul(2)?.try_into().ok()?)
347     }
348 }
349 
transmitq_idx(port_id: u32) -> Option<usize>350 fn transmitq_idx(port_id: u32) -> Option<usize> {
351     if port_id == 0 {
352         Some(PORT0_TRANSMITQ_IDX)
353     } else {
354         PORT1_TRANSMITQ_IDX.checked_add((port_id - 1).checked_mul(2)?.try_into().ok()?)
355     }
356 }
357 
receiveq_port_id(queue_idx: usize) -> Option<u32>358 fn receiveq_port_id(queue_idx: usize) -> Option<u32> {
359     if queue_idx == PORT0_RECEIVEQ_IDX {
360         Some(0)
361     } else if queue_idx >= PORT1_RECEIVEQ_IDX && (queue_idx & 1) == 0 {
362         ((queue_idx - PORT1_RECEIVEQ_IDX) / 2)
363             .checked_add(1)?
364             .try_into()
365             .ok()
366     } else {
367         None
368     }
369 }
370 
transmitq_port_id(queue_idx: usize) -> Option<u32>371 fn transmitq_port_id(queue_idx: usize) -> Option<u32> {
372     if queue_idx == PORT0_TRANSMITQ_IDX {
373         Some(0)
374     } else if queue_idx >= PORT1_TRANSMITQ_IDX && (queue_idx & 1) == 1 {
375         ((queue_idx - PORT1_TRANSMITQ_IDX) / 2)
376             .checked_add(1)?
377             .try_into()
378             .ok()
379     } else {
380         None
381     }
382 }
383 
384 #[cfg(test)]
385 mod tests {
386     use super::*;
387 
388     #[test]
test_receiveq_idx()389     fn test_receiveq_idx() {
390         assert_eq!(receiveq_idx(0), Some(0));
391         assert_eq!(receiveq_idx(1), Some(4));
392         assert_eq!(receiveq_idx(2), Some(6));
393         assert_eq!(receiveq_idx(3), Some(8));
394     }
395 
396     #[test]
test_transmitq_idx()397     fn test_transmitq_idx() {
398         assert_eq!(transmitq_idx(0), Some(1));
399         assert_eq!(transmitq_idx(1), Some(5));
400         assert_eq!(transmitq_idx(2), Some(7));
401         assert_eq!(transmitq_idx(3), Some(9));
402     }
403 
404     #[test]
test_receiveq_port_id()405     fn test_receiveq_port_id() {
406         assert_eq!(receiveq_port_id(0), Some(0));
407         assert_eq!(receiveq_port_id(1), None); // port0 transmitq
408         assert_eq!(receiveq_port_id(2), None); // ctrl receiveq
409         assert_eq!(receiveq_port_id(3), None); // ctrl transmitq
410         assert_eq!(receiveq_port_id(4), Some(1));
411         assert_eq!(receiveq_port_id(5), None);
412         assert_eq!(receiveq_port_id(6), Some(2));
413         assert_eq!(receiveq_port_id(7), None);
414         assert_eq!(receiveq_port_id(8), Some(3));
415         assert_eq!(receiveq_port_id(9), None);
416     }
417 
418     #[test]
test_transmitq_port_id()419     fn test_transmitq_port_id() {
420         assert_eq!(transmitq_port_id(0), None); // port0 receiveq
421         assert_eq!(transmitq_port_id(1), Some(0));
422         assert_eq!(transmitq_port_id(2), None); // ctrl receiveq
423         assert_eq!(transmitq_port_id(3), None); // ctrl transmitq
424         assert_eq!(transmitq_port_id(4), None); // port1 receiveq
425         assert_eq!(transmitq_port_id(5), Some(1));
426         assert_eq!(transmitq_port_id(6), None);
427         assert_eq!(transmitq_port_id(7), Some(2));
428         assert_eq!(transmitq_port_id(8), None);
429         assert_eq!(transmitq_port_id(9), Some(3));
430     }
431 }
432