1 // Copyright 2022 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::io; 6 use std::io::Write; 7 use std::thread; 8 use std::thread::JoinHandle; 9 use std::time::Duration; 10 11 use base::error; 12 use base::named_pipes::PipeConnection; 13 use base::Event; 14 use base::EventToken; 15 use base::FileSync; 16 use base::RawDescriptor; 17 use base::Result; 18 use base::WaitContext; 19 use hypervisor::ProtectionType; 20 21 use crate::bus::BusDevice; 22 use crate::serial_device::SerialInput; 23 use crate::sys::serial_device::SerialDevice; 24 use crate::Serial; 25 26 // TODO(b/234469655): Remove type alias once ReadNotifier is implemented for 27 // PipeConnection. 28 pub(crate) type InStreamType = Box<PipeConnection>; 29 30 /// Windows specific paramters for the serial device. 31 pub struct SystemSerialParams { 32 pub in_stream: Option<InStreamType>, 33 pub sync: Option<Box<dyn FileSync + Send>>, 34 pub sync_thread: Option<JoinHandle<SyncWorker>>, 35 pub kill_evt: Option<Event>, 36 } 37 38 impl Serial { 39 // Spawn the worker thread if it hasn't been spawned yet. handle_sync_thread(&mut self)40 pub(in crate::serial) fn handle_sync_thread(&mut self) { 41 if self.system_params.sync.is_some() { 42 let sync = match self.system_params.sync.take() { 43 Some(sync) => sync, 44 None => return, 45 }; 46 47 let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) 48 { 49 Ok(v) => v, 50 Err(e) => { 51 error!("failed creating kill Event pair: {}", e); 52 return; 53 } 54 }; 55 self.system_params.kill_evt = Some(self_kill_evt); 56 57 match thread::Builder::new() 58 .name(format!("{} sync thread", self.debug_label())) 59 .spawn(move || { 60 let mut worker = SyncWorker { 61 kill_evt, 62 file: sync, 63 }; 64 worker.run(); 65 worker 66 }) { 67 Err(e) => { 68 error!("failed to spawn sync thread: {}", e); 69 } 70 Ok(sync_thread) => self.system_params.sync_thread = Some(sync_thread), 71 }; 72 } 73 } 74 } 75 76 impl SerialDevice for Serial { 77 /// Constructs a Serial device ready for input and output. 78 /// 79 /// The stream `input` should not block, instead returning 0 bytes if are no bytes available. new( _protection_type: ProtectionType, interrupt_evt: Event, input: Option<Box<dyn SerialInput>>, out: Option<Box<dyn io::Write + Send>>, sync: Option<Box<dyn FileSync + Send>>, out_timestamp: bool, _keep_rds: Vec<RawDescriptor>, ) -> Serial80 fn new( 81 _protection_type: ProtectionType, 82 interrupt_evt: Event, 83 input: Option<Box<dyn SerialInput>>, 84 out: Option<Box<dyn io::Write + Send>>, 85 sync: Option<Box<dyn FileSync + Send>>, 86 out_timestamp: bool, 87 _keep_rds: Vec<RawDescriptor>, 88 ) -> Serial { 89 let system_params = SystemSerialParams { 90 in_stream: None, 91 sync, 92 sync_thread: None, 93 kill_evt: None, 94 }; 95 Serial::new_common(interrupt_evt, input, out, out_timestamp, system_params) 96 } 97 98 /// Constructs a Serial device connected to a named pipe for I/O 99 /// 100 /// pipe_in and pipe_out should both refer to the same end of the same pipe, but may have 101 /// different underlying descriptors. new_with_pipe( _protection_type: ProtectionType, interrupt_evt: Event, pipe_in: PipeConnection, pipe_out: PipeConnection, _keep_rds: Vec<RawDescriptor>, ) -> Serial102 fn new_with_pipe( 103 _protection_type: ProtectionType, 104 interrupt_evt: Event, 105 pipe_in: PipeConnection, 106 pipe_out: PipeConnection, 107 _keep_rds: Vec<RawDescriptor>, 108 ) -> Serial { 109 let system_params = SystemSerialParams { 110 in_stream: Some(Box::new(pipe_in)), 111 sync: None, 112 sync_thread: None, 113 kill_evt: None, 114 }; 115 let out_timestamp = false; 116 Serial::new_common( 117 interrupt_evt, 118 None, 119 Some(Box::new(pipe_out)), 120 out_timestamp, 121 system_params, 122 ) 123 } 124 } 125 126 impl Drop for Serial { drop(&mut self)127 fn drop(&mut self) { 128 if let Some(kill_evt) = self.system_params.kill_evt.take() { 129 // Ignore the result because there is nothing we can do about it. 130 let _ = kill_evt.signal(); 131 } 132 133 if let Some(sync_thread) = self.system_params.sync_thread.take() { 134 let _ = sync_thread.join(); 135 } 136 } 137 } 138 139 /// Worker to help with flusing contents of `file` to disk. 140 pub struct SyncWorker { 141 kill_evt: Event, 142 file: Box<dyn FileSync + Send>, 143 } 144 145 impl SyncWorker { run(&mut self)146 pub(in crate::serial) fn run(&mut self) { 147 let mut timer = match base::Timer::new() { 148 Err(e) => { 149 error!("failed to create timer for SyncWorker: {}", e); 150 return; 151 } 152 Ok(timer) => timer, 153 }; 154 155 if let Err(e) = timer.reset(Duration::from_secs(1), Some(Duration::from_secs(1))) { 156 error!("failed to set timer for SyncWorker: {}", e); 157 return; 158 } 159 160 #[derive(EventToken)] 161 enum Token { 162 Sync, 163 Kill, 164 } 165 166 let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[ 167 (&timer, Token::Sync), 168 (&self.kill_evt, Token::Kill), 169 ]) { 170 Ok(ec) => ec, 171 Err(e) => { 172 error!("failed creating WaitContext: {}", e); 173 return; 174 } 175 }; 176 loop { 177 let events = match wait_ctx.wait() { 178 Ok(v) => v, 179 Err(e) => { 180 error!("failed polling for events: {}", e); 181 return; 182 } 183 }; 184 185 for event in events.iter().filter(|e| e.is_readable) { 186 match event.token { 187 Token::Sync => { 188 if let Err(e) = self.file.fsync() { 189 error!("failed to fsync serial device, stopping sync thread: {}", e); 190 return; 191 } 192 } 193 Token::Kill => { 194 if let Err(e) = self.file.fsync() { 195 error!("failed to fsync serial device, stopping sync thread: {}", e); 196 return; 197 } 198 return; 199 } 200 } 201 } 202 } 203 } 204 } 205 206 #[cfg(test)] 207 mod tests { 208 use super::*; 209 use crate::serial::tests::*; 210 use crate::serial::*; 211 212 #[cfg(windows)] 213 #[test] named_pipe()214 fn named_pipe() { 215 use base::named_pipes; 216 use base::named_pipes::BlockingMode; 217 use base::named_pipes::FramingMode; 218 use rand::Rng; 219 220 let path_str = format!(r"\\.\pipe\crosvm_test_{}", rand::thread_rng().gen::<u64>()); 221 222 let pipe_in = named_pipes::create_server_pipe( 223 &path_str, 224 &FramingMode::Byte, 225 &BlockingMode::NoWait, 226 0, // default timeout 227 named_pipes::DEFAULT_BUFFER_SIZE, 228 false, 229 ) 230 .unwrap(); 231 232 let pipe_out = pipe_in.try_clone().unwrap(); 233 let event = Event::new().unwrap(); 234 235 let mut device = Serial::new_with_pipe( 236 ProtectionType::Unprotected, 237 event, 238 pipe_in, 239 pipe_out, 240 Vec::new(), 241 ); 242 243 let client_pipe = named_pipes::create_client_pipe( 244 &path_str, 245 &FramingMode::Byte, 246 &BlockingMode::Wait, 247 false, 248 ) 249 .unwrap(); 250 251 unsafe { 252 // Check that serial output is sent to the pipe 253 device.write(serial_bus_address(DATA), &[b'T']); 254 device.write(serial_bus_address(DATA), &[b'D']); 255 256 let mut read_buf: [u8; 2] = [0; 2]; 257 258 assert_eq!(client_pipe.read(&mut read_buf).unwrap(), 2); 259 assert_eq!(read_buf, [b'T', b'D']); 260 261 // Check that pipe_in is the other end of client_pipe. It's not actually wired up to 262 // SerialInput in this file so we can't test the data flow 263 client_pipe 264 .write(&[1, 2]) 265 .expect("Failed to write to client pipe"); 266 assert_eq!( 267 device 268 .system_params 269 .in_stream 270 .as_mut() 271 .unwrap() 272 .read(&mut read_buf) 273 .unwrap(), 274 2 275 ); 276 assert_eq!(read_buf, [1, 2]); 277 } 278 } 279 } 280