1 // Copyright 2022 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::io; 6 use std::io::Write; 7 use std::thread; 8 use std::thread::JoinHandle; 9 use std::time::Duration; 10 11 use base::error; 12 use base::named_pipes::PipeConnection; 13 use base::Event; 14 use base::EventToken; 15 use base::FileSync; 16 use base::RawDescriptor; 17 use base::Result; 18 use base::TimerTrait; 19 use base::WaitContext; 20 use hypervisor::ProtectionType; 21 22 use crate::bus::BusDevice; 23 use crate::serial_device::SerialInput; 24 use crate::serial_device::SerialOptions; 25 use crate::sys::serial_device::SerialDevice; 26 use crate::Serial; 27 28 // TODO(b/234469655): Remove type alias once ReadNotifier is implemented for 29 // PipeConnection. 30 pub(crate) type InStreamType = Box<PipeConnection>; 31 32 /// Windows specific paramters for the serial device. 33 pub struct SystemSerialParams { 34 pub in_stream: Option<InStreamType>, 35 pub sync: Option<Box<dyn FileSync + Send>>, 36 pub sync_thread: Option<JoinHandle<SyncWorker>>, 37 pub kill_evt: Option<Event>, 38 } 39 40 impl Serial { 41 // Spawn the worker thread if it hasn't been spawned yet. handle_sync_thread(&mut self)42 pub(in crate::serial) fn handle_sync_thread(&mut self) { 43 if self.system_params.sync.is_some() { 44 let sync = match self.system_params.sync.take() { 45 Some(sync) => sync, 46 None => return, 47 }; 48 49 let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) 50 { 51 Ok(v) => v, 52 Err(e) => { 53 error!("failed creating kill Event pair: {}", e); 54 return; 55 } 56 }; 57 self.system_params.kill_evt = Some(self_kill_evt); 58 59 match thread::Builder::new() 60 .name(format!("{} sync thread", self.debug_label())) 61 .spawn(move || { 62 let mut worker = SyncWorker { 63 kill_evt, 64 file: sync, 65 }; 66 worker.run(); 67 worker 68 }) { 69 Err(e) => { 70 error!("failed to spawn sync thread: {}", e); 71 } 72 Ok(sync_thread) => self.system_params.sync_thread = Some(sync_thread), 73 }; 74 } 75 } 76 } 77 78 impl SerialDevice for Serial { 79 /// Constructs a Serial device ready for input and output. 80 /// 81 /// The stream `input` should not block, instead returning 0 bytes if are no bytes available. new( _protection_type: ProtectionType, interrupt_evt: Event, input: Option<Box<dyn SerialInput>>, out: Option<Box<dyn io::Write + Send>>, sync: Option<Box<dyn FileSync + Send>>, options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> Serial82 fn new( 83 _protection_type: ProtectionType, 84 interrupt_evt: Event, 85 input: Option<Box<dyn SerialInput>>, 86 out: Option<Box<dyn io::Write + Send>>, 87 sync: Option<Box<dyn FileSync + Send>>, 88 options: SerialOptions, 89 _keep_rds: Vec<RawDescriptor>, 90 ) -> Serial { 91 let system_params = SystemSerialParams { 92 in_stream: None, 93 sync, 94 sync_thread: None, 95 kill_evt: None, 96 }; 97 Serial::new_common( 98 interrupt_evt, 99 input, 100 out, 101 options.out_timestamp, 102 system_params, 103 ) 104 } 105 106 /// Constructs a Serial device connected to a named pipe for I/O 107 /// 108 /// pipe_in and pipe_out should both refer to the same end of the same pipe, but may have 109 /// different underlying descriptors. new_with_pipe( _protection_type: ProtectionType, interrupt_evt: Event, pipe_in: PipeConnection, pipe_out: PipeConnection, _options: SerialOptions, _keep_rds: Vec<RawDescriptor>, ) -> Serial110 fn new_with_pipe( 111 _protection_type: ProtectionType, 112 interrupt_evt: Event, 113 pipe_in: PipeConnection, 114 pipe_out: PipeConnection, 115 _options: SerialOptions, 116 _keep_rds: Vec<RawDescriptor>, 117 ) -> Serial { 118 let system_params = SystemSerialParams { 119 in_stream: Some(Box::new(pipe_in)), 120 sync: None, 121 sync_thread: None, 122 kill_evt: None, 123 }; 124 let out_timestamp = false; 125 Serial::new_common( 126 interrupt_evt, 127 None, 128 Some(Box::new(pipe_out)), 129 out_timestamp, 130 system_params, 131 ) 132 } 133 } 134 135 impl Drop for Serial { drop(&mut self)136 fn drop(&mut self) { 137 if let Some(kill_evt) = self.system_params.kill_evt.take() { 138 // Ignore the result because there is nothing we can do about it. 139 let _ = kill_evt.signal(); 140 } 141 142 if let Some(sync_thread) = self.system_params.sync_thread.take() { 143 let _ = sync_thread.join(); 144 } 145 } 146 } 147 148 /// Worker to help with flusing contents of `file` to disk. 149 pub struct SyncWorker { 150 kill_evt: Event, 151 file: Box<dyn FileSync + Send>, 152 } 153 154 impl SyncWorker { run(&mut self)155 pub(in crate::serial) fn run(&mut self) { 156 let mut timer = match base::Timer::new() { 157 Err(e) => { 158 error!("failed to create timer for SyncWorker: {}", e); 159 return; 160 } 161 Ok(timer) => timer, 162 }; 163 164 if let Err(e) = timer.reset(Duration::from_secs(1), Some(Duration::from_secs(1))) { 165 error!("failed to set timer for SyncWorker: {}", e); 166 return; 167 } 168 169 #[derive(EventToken)] 170 enum Token { 171 Sync, 172 Kill, 173 } 174 175 let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[ 176 (&timer, Token::Sync), 177 (&self.kill_evt, Token::Kill), 178 ]) { 179 Ok(ec) => ec, 180 Err(e) => { 181 error!("failed creating WaitContext: {}", e); 182 return; 183 } 184 }; 185 loop { 186 let events = match wait_ctx.wait() { 187 Ok(v) => v, 188 Err(e) => { 189 error!("failed polling for events: {}", e); 190 return; 191 } 192 }; 193 194 for event in events.iter().filter(|e| e.is_readable) { 195 match event.token { 196 Token::Sync => { 197 if let Err(e) = self.file.fsync() { 198 error!("failed to fsync serial device, stopping sync thread: {}", e); 199 return; 200 } 201 } 202 Token::Kill => { 203 if let Err(e) = self.file.fsync() { 204 error!("failed to fsync serial device, stopping sync thread: {}", e); 205 return; 206 } 207 return; 208 } 209 } 210 } 211 } 212 } 213 } 214 215 #[cfg(test)] 216 mod tests { 217 use super::*; 218 use crate::serial::tests::*; 219 use crate::serial::*; 220 221 #[cfg(windows)] 222 #[test] named_pipe()223 fn named_pipe() { 224 use base::named_pipes; 225 use base::named_pipes::BlockingMode; 226 use base::named_pipes::FramingMode; 227 use rand::Rng; 228 229 let path_str = format!(r"\\.\pipe\crosvm_test_{}", rand::thread_rng().gen::<u64>()); 230 231 let pipe_in = named_pipes::create_server_pipe( 232 &path_str, 233 &FramingMode::Byte, 234 &BlockingMode::NoWait, 235 0, // default timeout 236 named_pipes::DEFAULT_BUFFER_SIZE, 237 false, 238 ) 239 .unwrap(); 240 241 let pipe_out = pipe_in.try_clone().unwrap(); 242 let event = Event::new().unwrap(); 243 244 let mut device = Serial::new_with_pipe( 245 ProtectionType::Unprotected, 246 event, 247 pipe_in, 248 pipe_out, 249 Default::default(), 250 Vec::new(), 251 ); 252 253 let client_pipe = named_pipes::create_client_pipe( 254 &path_str, 255 &FramingMode::Byte, 256 &BlockingMode::Wait, 257 false, 258 ) 259 .unwrap(); 260 261 // TODO(b/315998194): Add safety comment 262 #[allow(clippy::undocumented_unsafe_blocks)] 263 unsafe { 264 // Check that serial output is sent to the pipe 265 device.write(serial_bus_address(DATA), &[b'T']); 266 device.write(serial_bus_address(DATA), &[b'D']); 267 268 let mut read_buf: [u8; 2] = [0; 2]; 269 270 assert_eq!(client_pipe.read(&mut read_buf).unwrap(), 2); 271 assert_eq!(read_buf, [b'T', b'D']); 272 273 // Check that pipe_in is the other end of client_pipe. It's not actually wired up to 274 // SerialInput in this file so we can't test the data flow 275 client_pipe 276 .write(&[1, 2]) 277 .expect("Failed to write to client pipe"); 278 assert_eq!( 279 device 280 .system_params 281 .in_stream 282 .as_mut() 283 .unwrap() 284 .read(&mut read_buf) 285 .unwrap(), 286 2 287 ); 288 assert_eq!(read_buf, [1, 2]); 289 } 290 } 291 } 292