• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::VecDeque;
6 use std::io;
7 use std::sync::Arc;
8 use std::time::Duration;
9 use std::time::Instant;
10 
11 use base::error;
12 use base::Event;
13 use base::EventToken;
14 use base::FileSync;
15 use base::RawDescriptor;
16 use base::WaitContext;
17 use base::WorkerThread;
18 use sync::Mutex;
19 
20 use crate::serial::sys::InStreamType;
21 use crate::serial_device::SerialInput;
22 use crate::serial_device::SerialOptions;
23 use crate::virtio::console::Console;
24 use crate::virtio::ProtectionType;
25 use crate::SerialDevice;
26 
27 impl SerialDevice for Console {
new( protection_type: ProtectionType, _event: Event, input: Option<Box<dyn SerialInput>>, out: Option<Box<dyn io::Write + Send>>, _sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, keep_rds: Vec<RawDescriptor>, ) -> Console28     fn new(
29         protection_type: ProtectionType,
30         _event: Event,
31         input: Option<Box<dyn SerialInput>>,
32         out: Option<Box<dyn io::Write + Send>>,
33         // TODO(b/171331752): connect filesync functionality.
34         _sync: Option<Box<dyn FileSync + Send>>,
35         options: SerialOptions,
36         keep_rds: Vec<RawDescriptor>,
37     ) -> Console {
38         Console::new(protection_type, input, out, keep_rds, options.pci_address)
39     }
40 }
41 
is_a_fatal_input_error(e: &io::Error) -> bool42 fn is_a_fatal_input_error(e: &io::Error) -> bool {
43     e.kind() != io::ErrorKind::Interrupted
44 }
45 
46 /// Starts a thread that reads rx and sends the input back via the returned buffer.
47 ///
48 /// The caller should listen on `in_avail_evt` for events. When `in_avail_evt` signals that data
49 /// is available, the caller should lock the returned `Mutex` and read data out of the inner
50 /// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed.
51 ///
52 /// # Arguments
53 ///
54 /// * `rx` - Data source that the reader thread will wait on to send data back to the buffer
55 /// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer
spawn_input_thread( mut rx: InStreamType, in_avail_evt: &Event, input_buffer: VecDeque<u8>, ) -> (Arc<Mutex<VecDeque<u8>>>, WorkerThread<InStreamType>)56 pub(in crate::virtio::console) fn spawn_input_thread(
57     mut rx: InStreamType,
58     in_avail_evt: &Event,
59     input_buffer: VecDeque<u8>,
60 ) -> (Arc<Mutex<VecDeque<u8>>>, WorkerThread<InStreamType>) {
61     let buffer = Arc::new(Mutex::new(input_buffer));
62     let buffer_cloned = buffer.clone();
63 
64     let thread_in_avail_evt = in_avail_evt
65         .try_clone()
66         .expect("failed to clone in_avail_evt");
67 
68     let res = WorkerThread::start("v_console_input", move |kill_evt| {
69         // If there is already data, signal immediately.
70         if !buffer.lock().is_empty() {
71             thread_in_avail_evt.signal().unwrap();
72         }
73         read_input(&mut rx, &thread_in_avail_evt, buffer, kill_evt);
74         rx
75     });
76     (buffer_cloned, res)
77 }
78 
read_input( rx: &mut InStreamType, thread_in_avail_evt: &Event, buffer: Arc<Mutex<VecDeque<u8>>>, kill_evt: Event, )79 pub(in crate::virtio::console) fn read_input(
80     rx: &mut InStreamType,
81     thread_in_avail_evt: &Event,
82     buffer: Arc<Mutex<VecDeque<u8>>>,
83     kill_evt: Event,
84 ) {
85     #[derive(EventToken)]
86     enum Token {
87         ConsoleEvent,
88         Kill,
89     }
90 
91     let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
92         (&kill_evt, Token::Kill),
93         (rx.get_read_notifier(), Token::ConsoleEvent),
94     ]) {
95         Ok(ctx) => ctx,
96         Err(e) => {
97             error!("failed creating WaitContext {:?}", e);
98             return;
99         }
100     };
101 
102     let mut kill_timeout = None;
103     let mut rx_buf = [0u8; 1 << 12];
104     'wait: loop {
105         let events = match wait_ctx.wait() {
106             Ok(events) => events,
107             Err(e) => {
108                 error!("Failed to wait for events. {}", e);
109                 return;
110             }
111         };
112         for event in events.iter() {
113             match event.token {
114                 Token::Kill => {
115                     // Ignore the kill event until there are no other events to process so that
116                     // we drain `rx` as much as possible. The next `wait_ctx.wait()` call will
117                     // immediately re-entry this case since we don't call `kill_evt.wait()`.
118                     if events.iter().all(|e| matches!(e.token, Token::Kill)) {
119                         break 'wait;
120                     }
121                     const TIMEOUT_DURATION: Duration = Duration::from_millis(500);
122                     match kill_timeout {
123                         None => {
124                             kill_timeout = Some(Instant::now() + TIMEOUT_DURATION);
125                         }
126                         Some(t) => {
127                             if Instant::now() >= t {
128                                 error!(
129                                     "failed to drain console input within {:?}, giving up",
130                                     TIMEOUT_DURATION
131                                 );
132                                 break 'wait;
133                             }
134                         }
135                     }
136                 }
137                 Token::ConsoleEvent => {
138                     match rx.read(&mut rx_buf) {
139                         Ok(0) => break 'wait, // Assume the stream of input has ended.
140                         Ok(size) => {
141                             buffer.lock().extend(&rx_buf[0..size]);
142                             thread_in_avail_evt.signal().unwrap();
143                         }
144                         Err(e) => {
145                             // Being interrupted is not an error, but everything else is.
146                             if is_a_fatal_input_error(&e) {
147                                 error!(
148                                     "failed to read for bytes to queue into console device: {}",
149                                     e
150                                 );
151                                 break 'wait;
152                             }
153                         }
154                     }
155                 }
156             }
157         }
158     }
159 }
160