• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io;
6 use std::io::Write;
7 use std::thread;
8 use std::thread::JoinHandle;
9 use std::time::Duration;
10 
11 use base::error;
12 use base::named_pipes::PipeConnection;
13 use base::Event;
14 use base::EventToken;
15 use base::FileSync;
16 use base::RawDescriptor;
17 use base::Result;
18 use base::WaitContext;
19 use hypervisor::ProtectionType;
20 
21 use crate::bus::BusDevice;
22 use crate::serial_device::SerialInput;
23 use crate::sys::serial_device::SerialDevice;
24 use crate::Serial;
25 
26 // TODO(b/234469655): Remove type alias once ReadNotifier is implemented for
27 // PipeConnection.
28 pub(crate) type InStreamType = Box<PipeConnection>;
29 
30 /// Windows specific paramters for the serial device.
31 pub struct SystemSerialParams {
32     pub in_stream: Option<InStreamType>,
33     pub sync: Option<Box<dyn FileSync + Send>>,
34     pub sync_thread: Option<JoinHandle<SyncWorker>>,
35     pub kill_evt: Option<Event>,
36 }
37 
38 impl Serial {
39     // Spawn the worker thread if it hasn't been spawned yet.
handle_sync_thread(&mut self)40     pub(in crate::serial) fn handle_sync_thread(&mut self) {
41         if self.system_params.sync.is_some() {
42             let sync = match self.system_params.sync.take() {
43                 Some(sync) => sync,
44                 None => return,
45             };
46 
47             let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e)))
48             {
49                 Ok(v) => v,
50                 Err(e) => {
51                     error!("failed creating kill Event pair: {}", e);
52                     return;
53                 }
54             };
55             self.system_params.kill_evt = Some(self_kill_evt);
56 
57             match thread::Builder::new()
58                 .name(format!("{} sync thread", self.debug_label()))
59                 .spawn(move || {
60                     let mut worker = SyncWorker {
61                         kill_evt,
62                         file: sync,
63                     };
64                     worker.run();
65                     worker
66                 }) {
67                 Err(e) => {
68                     error!("failed to spawn sync thread: {}", e);
69                 }
70                 Ok(sync_thread) => self.system_params.sync_thread = Some(sync_thread),
71             };
72         }
73     }
74 }
75 
76 impl SerialDevice for Serial {
77     /// Constructs a Serial device ready for input and output.
78     ///
79     /// The stream `input` should not block, instead returning 0 bytes if are no bytes available.
new( _protection_type: ProtectionType, interrupt_evt: Event, input: Option<Box<dyn SerialInput>>, out: Option<Box<dyn io::Write + Send>>, sync: Option<Box<dyn FileSync + Send>>, out_timestamp: bool, _keep_rds: Vec<RawDescriptor>, ) -> Serial80     fn new(
81         _protection_type: ProtectionType,
82         interrupt_evt: Event,
83         input: Option<Box<dyn SerialInput>>,
84         out: Option<Box<dyn io::Write + Send>>,
85         sync: Option<Box<dyn FileSync + Send>>,
86         out_timestamp: bool,
87         _keep_rds: Vec<RawDescriptor>,
88     ) -> Serial {
89         let system_params = SystemSerialParams {
90             in_stream: None,
91             sync,
92             sync_thread: None,
93             kill_evt: None,
94         };
95         Serial::new_common(interrupt_evt, input, out, out_timestamp, system_params)
96     }
97 
98     /// Constructs a Serial device connected to a named pipe for I/O
99     ///
100     /// pipe_in and pipe_out should both refer to the same end of the same pipe, but may have
101     /// different underlying descriptors.
new_with_pipe( _protection_type: ProtectionType, interrupt_evt: Event, pipe_in: PipeConnection, pipe_out: PipeConnection, _keep_rds: Vec<RawDescriptor>, ) -> Serial102     fn new_with_pipe(
103         _protection_type: ProtectionType,
104         interrupt_evt: Event,
105         pipe_in: PipeConnection,
106         pipe_out: PipeConnection,
107         _keep_rds: Vec<RawDescriptor>,
108     ) -> Serial {
109         let system_params = SystemSerialParams {
110             in_stream: Some(Box::new(pipe_in)),
111             sync: None,
112             sync_thread: None,
113             kill_evt: None,
114         };
115         let out_timestamp = false;
116         Serial::new_common(
117             interrupt_evt,
118             None,
119             Some(Box::new(pipe_out)),
120             out_timestamp,
121             system_params,
122         )
123     }
124 }
125 
126 impl Drop for Serial {
drop(&mut self)127     fn drop(&mut self) {
128         if let Some(kill_evt) = self.system_params.kill_evt.take() {
129             // Ignore the result because there is nothing we can do about it.
130             let _ = kill_evt.signal();
131         }
132 
133         if let Some(sync_thread) = self.system_params.sync_thread.take() {
134             let _ = sync_thread.join();
135         }
136     }
137 }
138 
139 /// Worker to help with flusing contents of `file` to disk.
140 pub struct SyncWorker {
141     kill_evt: Event,
142     file: Box<dyn FileSync + Send>,
143 }
144 
145 impl SyncWorker {
run(&mut self)146     pub(in crate::serial) fn run(&mut self) {
147         let mut timer = match base::Timer::new() {
148             Err(e) => {
149                 error!("failed to create timer for SyncWorker: {}", e);
150                 return;
151             }
152             Ok(timer) => timer,
153         };
154 
155         if let Err(e) = timer.reset(Duration::from_secs(1), Some(Duration::from_secs(1))) {
156             error!("failed to set timer for SyncWorker: {}", e);
157             return;
158         }
159 
160         #[derive(EventToken)]
161         enum Token {
162             Sync,
163             Kill,
164         }
165 
166         let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
167             (&timer, Token::Sync),
168             (&self.kill_evt, Token::Kill),
169         ]) {
170             Ok(ec) => ec,
171             Err(e) => {
172                 error!("failed creating WaitContext: {}", e);
173                 return;
174             }
175         };
176         loop {
177             let events = match wait_ctx.wait() {
178                 Ok(v) => v,
179                 Err(e) => {
180                     error!("failed polling for events: {}", e);
181                     return;
182                 }
183             };
184 
185             for event in events.iter().filter(|e| e.is_readable) {
186                 match event.token {
187                     Token::Sync => {
188                         if let Err(e) = self.file.fsync() {
189                             error!("failed to fsync serial device, stopping sync thread: {}", e);
190                             return;
191                         }
192                     }
193                     Token::Kill => {
194                         if let Err(e) = self.file.fsync() {
195                             error!("failed to fsync serial device, stopping sync thread: {}", e);
196                             return;
197                         }
198                         return;
199                     }
200                 }
201             }
202         }
203     }
204 }
205 
206 #[cfg(test)]
207 mod tests {
208     use super::*;
209     use crate::serial::tests::*;
210     use crate::serial::*;
211 
212     #[cfg(windows)]
213     #[test]
named_pipe()214     fn named_pipe() {
215         use base::named_pipes;
216         use base::named_pipes::BlockingMode;
217         use base::named_pipes::FramingMode;
218         use rand::Rng;
219 
220         let path_str = format!(r"\\.\pipe\crosvm_test_{}", rand::thread_rng().gen::<u64>());
221 
222         let pipe_in = named_pipes::create_server_pipe(
223             &path_str,
224             &FramingMode::Byte,
225             &BlockingMode::NoWait,
226             0, // default timeout
227             named_pipes::DEFAULT_BUFFER_SIZE,
228             false,
229         )
230         .unwrap();
231 
232         let pipe_out = pipe_in.try_clone().unwrap();
233         let event = Event::new().unwrap();
234 
235         let mut device = Serial::new_with_pipe(
236             ProtectionType::Unprotected,
237             event,
238             pipe_in,
239             pipe_out,
240             Vec::new(),
241         );
242 
243         let client_pipe = named_pipes::create_client_pipe(
244             &path_str,
245             &FramingMode::Byte,
246             &BlockingMode::Wait,
247             false,
248         )
249         .unwrap();
250 
251         unsafe {
252             // Check that serial output is sent to the pipe
253             device.write(serial_bus_address(DATA), &[b'T']);
254             device.write(serial_bus_address(DATA), &[b'D']);
255 
256             let mut read_buf: [u8; 2] = [0; 2];
257 
258             assert_eq!(client_pipe.read(&mut read_buf).unwrap(), 2);
259             assert_eq!(read_buf, [b'T', b'D']);
260 
261             // Check that pipe_in is the other end of client_pipe. It's not actually wired up to
262             // SerialInput in this file so we can't test the data flow
263             client_pipe
264                 .write(&[1, 2])
265                 .expect("Failed to write to client pipe");
266             assert_eq!(
267                 device
268                     .system_params
269                     .in_stream
270                     .as_mut()
271                     .unwrap()
272                     .read(&mut read_buf)
273                     .unwrap(),
274                 2
275             );
276             assert_eq!(read_buf, [1, 2]);
277         }
278     }
279 }
280