• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::io;
6 use std::io::Write;
7 use std::thread;
8 use std::thread::JoinHandle;
9 use std::time::Duration;
10 
11 use base::error;
12 use base::named_pipes::PipeConnection;
13 use base::Event;
14 use base::EventToken;
15 use base::FileSync;
16 use base::RawDescriptor;
17 use base::Result;
18 use base::TimerTrait;
19 use base::WaitContext;
20 use hypervisor::ProtectionType;
21 
22 use crate::bus::BusDevice;
23 use crate::serial_device::SerialInput;
24 use crate::serial_device::SerialOptions;
25 use crate::sys::serial_device::SerialDevice;
26 use crate::Serial;
27 
28 // TODO(b/234469655): Remove type alias once ReadNotifier is implemented for
29 // PipeConnection.
30 pub(crate) type InStreamType = Box<PipeConnection>;
31 
32 /// Windows specific paramters for the serial device.
33 pub struct SystemSerialParams {
34     pub in_stream: Option<InStreamType>,
35     pub sync: Option<Box<dyn FileSync + Send>>,
36     pub sync_thread: Option<JoinHandle<SyncWorker>>,
37     pub kill_evt: Option<Event>,
38 }
39 
40 impl Serial {
41     // Spawn the worker thread if it hasn't been spawned yet.
handle_sync_thread(&mut self)42     pub(in crate::serial) fn handle_sync_thread(&mut self) {
43         if self.system_params.sync.is_some() {
44             let sync = match self.system_params.sync.take() {
45                 Some(sync) => sync,
46                 None => return,
47             };
48 
49             let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e)))
50             {
51                 Ok(v) => v,
52                 Err(e) => {
53                     error!("failed creating kill Event pair: {}", e);
54                     return;
55                 }
56             };
57             self.system_params.kill_evt = Some(self_kill_evt);
58 
59             match thread::Builder::new()
60                 .name(format!("{} sync thread", self.debug_label()))
61                 .spawn(move || {
62                     let mut worker = SyncWorker {
63                         kill_evt,
64                         file: sync,
65                     };
66                     worker.run();
67                     worker
68                 }) {
69                 Err(e) => {
70                     error!("failed to spawn sync thread: {}", e);
71                 }
72                 Ok(sync_thread) => self.system_params.sync_thread = Some(sync_thread),
73             };
74         }
75     }
76 }
77 
78 impl SerialDevice for Serial {
79     /// Constructs a Serial device ready for input and output.
80     ///
81     /// The stream `input` should not block, instead returning 0 bytes if are no bytes available.
new( _protection_type: ProtectionType, interrupt_evt: Event, input: Option<Box<dyn SerialInput>>, out: Option<Box<dyn io::Write + Send>>, sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> Serial82     fn new(
83         _protection_type: ProtectionType,
84         interrupt_evt: Event,
85         input: Option<Box<dyn SerialInput>>,
86         out: Option<Box<dyn io::Write + Send>>,
87         sync: Option<Box<dyn FileSync + Send>>,
88         options: SerialOptions,
89         _keep_rds: Vec<RawDescriptor>,
90     ) -> Serial {
91         let system_params = SystemSerialParams {
92             in_stream: None,
93             sync,
94             sync_thread: None,
95             kill_evt: None,
96         };
97         Serial::new_common(
98             interrupt_evt,
99             input,
100             out,
101             options.out_timestamp,
102             system_params,
103         )
104     }
105 
106     /// Constructs a Serial device connected to a named pipe for I/O
107     ///
108     /// pipe_in and pipe_out should both refer to the same end of the same pipe, but may have
109     /// different underlying descriptors.
new_with_pipe( _protection_type: ProtectionType, interrupt_evt: Event, pipe_in: PipeConnection, pipe_out: PipeConnection, _options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> Serial110     fn new_with_pipe(
111         _protection_type: ProtectionType,
112         interrupt_evt: Event,
113         pipe_in: PipeConnection,
114         pipe_out: PipeConnection,
115         _options: SerialOptions,
116         _keep_rds: Vec<RawDescriptor>,
117     ) -> Serial {
118         let system_params = SystemSerialParams {
119             in_stream: Some(Box::new(pipe_in)),
120             sync: None,
121             sync_thread: None,
122             kill_evt: None,
123         };
124         let out_timestamp = false;
125         Serial::new_common(
126             interrupt_evt,
127             None,
128             Some(Box::new(pipe_out)),
129             out_timestamp,
130             system_params,
131         )
132     }
133 }
134 
135 impl Drop for Serial {
drop(&mut self)136     fn drop(&mut self) {
137         if let Some(kill_evt) = self.system_params.kill_evt.take() {
138             // Ignore the result because there is nothing we can do about it.
139             let _ = kill_evt.signal();
140         }
141 
142         if let Some(sync_thread) = self.system_params.sync_thread.take() {
143             let _ = sync_thread.join();
144         }
145     }
146 }
147 
148 /// Worker to help with flusing contents of `file` to disk.
149 pub struct SyncWorker {
150     kill_evt: Event,
151     file: Box<dyn FileSync + Send>,
152 }
153 
154 impl SyncWorker {
run(&mut self)155     pub(in crate::serial) fn run(&mut self) {
156         let mut timer = match base::Timer::new() {
157             Err(e) => {
158                 error!("failed to create timer for SyncWorker: {}", e);
159                 return;
160             }
161             Ok(timer) => timer,
162         };
163 
164         if let Err(e) = timer.reset(Duration::from_secs(1), Some(Duration::from_secs(1))) {
165             error!("failed to set timer for SyncWorker: {}", e);
166             return;
167         }
168 
169         #[derive(EventToken)]
170         enum Token {
171             Sync,
172             Kill,
173         }
174 
175         let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
176             (&timer, Token::Sync),
177             (&self.kill_evt, Token::Kill),
178         ]) {
179             Ok(ec) => ec,
180             Err(e) => {
181                 error!("failed creating WaitContext: {}", e);
182                 return;
183             }
184         };
185         loop {
186             let events = match wait_ctx.wait() {
187                 Ok(v) => v,
188                 Err(e) => {
189                     error!("failed polling for events: {}", e);
190                     return;
191                 }
192             };
193 
194             for event in events.iter().filter(|e| e.is_readable) {
195                 match event.token {
196                     Token::Sync => {
197                         if let Err(e) = self.file.fsync() {
198                             error!("failed to fsync serial device, stopping sync thread: {}", e);
199                             return;
200                         }
201                     }
202                     Token::Kill => {
203                         if let Err(e) = self.file.fsync() {
204                             error!("failed to fsync serial device, stopping sync thread: {}", e);
205                             return;
206                         }
207                         return;
208                     }
209                 }
210             }
211         }
212     }
213 }
214 
215 #[cfg(test)]
216 mod tests {
217     use super::*;
218     use crate::serial::tests::*;
219     use crate::serial::*;
220 
221     #[cfg(windows)]
222     #[test]
named_pipe()223     fn named_pipe() {
224         use base::named_pipes;
225         use base::named_pipes::BlockingMode;
226         use base::named_pipes::FramingMode;
227         use rand::Rng;
228 
229         let path_str = format!(r"\\.\pipe\crosvm_test_{}", rand::thread_rng().gen::<u64>());
230 
231         let pipe_in = named_pipes::create_server_pipe(
232             &path_str,
233             &FramingMode::Byte,
234             &BlockingMode::NoWait,
235             0, // default timeout
236             named_pipes::DEFAULT_BUFFER_SIZE,
237             false,
238         )
239         .unwrap();
240 
241         let pipe_out = pipe_in.try_clone().unwrap();
242         let event = Event::new().unwrap();
243 
244         let mut device = Serial::new_with_pipe(
245             ProtectionType::Unprotected,
246             event,
247             pipe_in,
248             pipe_out,
249             Default::default(),
250             Vec::new(),
251         );
252 
253         let client_pipe = named_pipes::create_client_pipe(
254             &path_str,
255             &FramingMode::Byte,
256             &BlockingMode::Wait,
257             false,
258         )
259         .unwrap();
260 
261         // TODO(b/315998194): Add safety comment
262         #[allow(clippy::undocumented_unsafe_blocks)]
263         unsafe {
264             // Check that serial output is sent to the pipe
265             device.write(serial_bus_address(DATA), &[b'T']);
266             device.write(serial_bus_address(DATA), &[b'D']);
267 
268             let mut read_buf: [u8; 2] = [0; 2];
269 
270             assert_eq!(client_pipe.read(&mut read_buf).unwrap(), 2);
271             assert_eq!(read_buf, [b'T', b'D']);
272 
273             // Check that pipe_in is the other end of client_pipe. It's not actually wired up to
274             // SerialInput in this file so we can't test the data flow
275             client_pipe
276                 .write(&[1, 2])
277                 .expect("Failed to write to client pipe");
278             assert_eq!(
279                 device
280                     .system_params
281                     .in_stream
282                     .as_mut()
283                     .unwrap()
284                     .read(&mut read_buf)
285                     .unwrap(),
286                 2
287             );
288             assert_eq!(read_buf, [1, 2]);
289         }
290     }
291 }
292