1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::collections::VecDeque;
6 use std::io;
7 use std::sync::Arc;
8 use std::time::Duration;
9 use std::time::Instant;
10
11 use base::error;
12 use base::Event;
13 use base::EventToken;
14 use base::FileSync;
15 use base::RawDescriptor;
16 use base::WaitContext;
17 use base::WorkerThread;
18 use sync::Mutex;
19
20 use crate::serial::sys::InStreamType;
21 use crate::serial_device::SerialInput;
22 use crate::serial_device::SerialOptions;
23 use crate::virtio::console::Console;
24 use crate::virtio::ProtectionType;
25 use crate::SerialDevice;
26
27 impl SerialDevice for Console {
new( protection_type: ProtectionType, _event: Event, input: Option<Box<dyn SerialInput>>, out: Option<Box<dyn io::Write + Send>>, _sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, keep_rds: Vec<RawDescriptor>, ) -> Console28 fn new(
29 protection_type: ProtectionType,
30 _event: Event,
31 input: Option<Box<dyn SerialInput>>,
32 out: Option<Box<dyn io::Write + Send>>,
33 // TODO(b/171331752): connect filesync functionality.
34 _sync: Option<Box<dyn FileSync + Send>>,
35 options: SerialOptions,
36 keep_rds: Vec<RawDescriptor>,
37 ) -> Console {
38 Console::new(protection_type, input, out, keep_rds, options.pci_address)
39 }
40 }
41
is_a_fatal_input_error(e: &io::Error) -> bool42 fn is_a_fatal_input_error(e: &io::Error) -> bool {
43 e.kind() != io::ErrorKind::Interrupted
44 }
45
46 /// Starts a thread that reads rx and sends the input back via the returned buffer.
47 ///
48 /// The caller should listen on `in_avail_evt` for events. When `in_avail_evt` signals that data
49 /// is available, the caller should lock the returned `Mutex` and read data out of the inner
50 /// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed.
51 ///
52 /// # Arguments
53 ///
54 /// * `rx` - Data source that the reader thread will wait on to send data back to the buffer
55 /// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer
spawn_input_thread( mut rx: InStreamType, in_avail_evt: &Event, input_buffer: VecDeque<u8>, ) -> (Arc<Mutex<VecDeque<u8>>>, WorkerThread<InStreamType>)56 pub(in crate::virtio::console) fn spawn_input_thread(
57 mut rx: InStreamType,
58 in_avail_evt: &Event,
59 input_buffer: VecDeque<u8>,
60 ) -> (Arc<Mutex<VecDeque<u8>>>, WorkerThread<InStreamType>) {
61 let buffer = Arc::new(Mutex::new(input_buffer));
62 let buffer_cloned = buffer.clone();
63
64 let thread_in_avail_evt = in_avail_evt
65 .try_clone()
66 .expect("failed to clone in_avail_evt");
67
68 let res = WorkerThread::start("v_console_input", move |kill_evt| {
69 // If there is already data, signal immediately.
70 if !buffer.lock().is_empty() {
71 thread_in_avail_evt.signal().unwrap();
72 }
73 read_input(&mut rx, &thread_in_avail_evt, buffer, kill_evt);
74 rx
75 });
76 (buffer_cloned, res)
77 }
78
read_input( rx: &mut InStreamType, thread_in_avail_evt: &Event, buffer: Arc<Mutex<VecDeque<u8>>>, kill_evt: Event, )79 pub(in crate::virtio::console) fn read_input(
80 rx: &mut InStreamType,
81 thread_in_avail_evt: &Event,
82 buffer: Arc<Mutex<VecDeque<u8>>>,
83 kill_evt: Event,
84 ) {
85 #[derive(EventToken)]
86 enum Token {
87 ConsoleEvent,
88 Kill,
89 }
90
91 let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
92 (&kill_evt, Token::Kill),
93 (rx.get_read_notifier(), Token::ConsoleEvent),
94 ]) {
95 Ok(ctx) => ctx,
96 Err(e) => {
97 error!("failed creating WaitContext {:?}", e);
98 return;
99 }
100 };
101
102 let mut kill_timeout = None;
103 let mut rx_buf = [0u8; 1 << 12];
104 'wait: loop {
105 let events = match wait_ctx.wait() {
106 Ok(events) => events,
107 Err(e) => {
108 error!("Failed to wait for events. {}", e);
109 return;
110 }
111 };
112 for event in events.iter() {
113 match event.token {
114 Token::Kill => {
115 // Ignore the kill event until there are no other events to process so that
116 // we drain `rx` as much as possible. The next `wait_ctx.wait()` call will
117 // immediately re-entry this case since we don't call `kill_evt.wait()`.
118 if events.iter().all(|e| matches!(e.token, Token::Kill)) {
119 break 'wait;
120 }
121 const TIMEOUT_DURATION: Duration = Duration::from_millis(500);
122 match kill_timeout {
123 None => {
124 kill_timeout = Some(Instant::now() + TIMEOUT_DURATION);
125 }
126 Some(t) => {
127 if Instant::now() >= t {
128 error!(
129 "failed to drain console input within {:?}, giving up",
130 TIMEOUT_DURATION
131 );
132 break 'wait;
133 }
134 }
135 }
136 }
137 Token::ConsoleEvent => {
138 match rx.read(&mut rx_buf) {
139 Ok(0) => break 'wait, // Assume the stream of input has ended.
140 Ok(size) => {
141 buffer.lock().extend(&rx_buf[0..size]);
142 thread_in_avail_evt.signal().unwrap();
143 }
144 Err(e) => {
145 // Being interrupted is not an error, but everything else is.
146 if is_a_fatal_input_error(&e) {
147 error!(
148 "failed to read for bytes to queue into console device: {}",
149 e
150 );
151 break 'wait;
152 }
153 }
154 }
155 }
156 }
157 }
158 }
159 }
160