1 // Copyright 2023 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 //! Implements the CrosVM control socket on Windows. Unlike on unix, this is a bit involved because 6 //! we can't process the raw named pipe in line inside `run_control` (named pipes aren't directly 7 //! waitable). In theory, AF_UNIX can be made waitable, but AF_UNIX is very slow, and we already 8 //! have significant prior art for using named pipes in a waitable fashion (`base::StreamChannel`). 9 10 use std::io; 11 use std::sync::mpsc; 12 use std::sync::Arc; 13 14 use base::named_pipes; 15 use base::named_pipes::OverlappedWrapper; 16 use base::named_pipes::PipeConnection; 17 use base::BlockingMode; 18 use base::Event; 19 use base::EventExt; 20 use base::EventToken; 21 use base::FlushOnDropTube; 22 use base::FramingMode; 23 use base::ReadNotifier; 24 use base::RecvTube; 25 use base::SendTube; 26 use base::StreamChannel; 27 use base::Tube; 28 use base::TubeError; 29 use base::WaitContext; 30 use base::WorkerThread; 31 use libc::EIO; 32 use log::error; 33 use log::info; 34 use log::warn; 35 use sync::Mutex; 36 use vm_control::VmRequest; 37 use vm_control::VmResponse; 38 use winapi::shared::winerror::ERROR_MORE_DATA; 39 40 /// Windows named pipes don't fit in well with the control loop (`run_control`) the way sockets do 41 /// on unix, so this struct provides a compatibility layer (named pipe server) that functions very 42 /// similarly to how a socket server would on unix. 43 /// 44 /// Terminology: 45 /// * The `ControlServer` is a socket server compatibility layer. 46 /// * The "control loop" is the VMM's main loop (`run_control`). It uses the `ControlServer` to 47 /// accept & service connections from clients that want to control the VMM (e.g. press the 48 /// power button, etc). 49 pub struct ControlServer { 50 server_listener_worker: WorkerThread<(io::Result<()>, ClientWorker)>, 51 /// Signaled when a client has connected and can be accepted without blocking. 52 client_waiting: Event, 53 /// Provides the accepted Tube every time a client connects. 54 client_tube_channel: mpsc::Receiver<FlushOnDropTube>, 55 } 56 57 #[derive(EventToken)] 58 enum Token { 59 Exit, 60 Readable, 61 } 62 63 impl ControlServer { 64 /// Creates a named pipe server on `pipe_name` that forwards Tube messages between the connected 65 /// client on that pipe, and the Tube returned by `ControlServer::accept`. new(pipe_name: &str) -> anyhow::Result<Self>66 pub fn new(pipe_name: &str) -> anyhow::Result<Self> { 67 let client_pipe_read = named_pipes::create_server_pipe( 68 pipe_name, 69 &named_pipes::FramingMode::Message, 70 &named_pipes::BlockingMode::Wait, 71 /* timeout= */ 0, 72 /* buffer_size= */ 1024 * 1024, 73 /* overlapped= */ true, 74 )?; 75 let client_pipe_write = client_pipe_read.try_clone()?; 76 let mut client_worker = ClientWorker::new(client_pipe_write); 77 let client_waiting = Event::new_auto_reset()?; 78 let client_waiting_for_worker = client_waiting.try_clone()?; 79 let (client_tube_channel_send, client_tube_channel_recv) = mpsc::channel(); 80 81 Ok(Self { 82 server_listener_worker: WorkerThread::start("ctrl_srv_listen_loop", move |exit_evt| { 83 let res = Self::server_listener_loop( 84 exit_evt, 85 &mut client_worker, 86 client_waiting_for_worker, 87 client_tube_channel_send, 88 client_pipe_read, 89 ); 90 if let Err(e) = res.as_ref() { 91 error!("server_listener_worker failed with error: {:?}", e) 92 } 93 (res, client_worker) 94 }), 95 client_waiting, 96 client_tube_channel: client_tube_channel_recv, 97 }) 98 } 99 100 /// Gets the client waiting event. If a client is waiting, [ControlServer::accept] can be called 101 /// and will return a [base::Tube] without blocking. client_waiting(&self) -> &Event102 pub fn client_waiting(&self) -> &Event { 103 &self.client_waiting 104 } 105 106 /// Accepts a connection (if one is waiting), returning a [base::Tube] connected to the client. 107 /// If [ControlServer::client_waiting] has not been signaled, this will block until a client 108 /// connects. accept(&mut self) -> FlushOnDropTube109 pub fn accept(&mut self) -> FlushOnDropTube { 110 self.client_tube_channel 111 .recv() 112 .expect("client worker has done away") 113 } 114 115 /// Shuts down the control server, disconnecting any connected clients. shutdown(self) -> base::Result<()>116 pub fn shutdown(self) -> base::Result<()> { 117 let (listen_res, client_worker) = self.server_listener_worker.stop(); 118 match listen_res { 119 Err(e) if e.kind() == io::ErrorKind::Interrupted => (), 120 Err(e) => return Err(base::Error::from(e)), 121 Ok(()) => (), 122 }; 123 client_worker.shutdown() 124 } 125 126 /// Listen loop for the control server. Handles waiting for new connections, creates the 127 /// forwarding thread for control loop -> client data, and forwards client -> control loop 128 /// data. server_listener_loop( exit_evt: Event, client_worker: &mut ClientWorker, client_waiting: Event, client_tube_send_channel: mpsc::Sender<FlushOnDropTube>, mut client_pipe_read: PipeConnection, ) -> io::Result<()>129 fn server_listener_loop( 130 exit_evt: Event, 131 client_worker: &mut ClientWorker, 132 client_waiting: Event, 133 client_tube_send_channel: mpsc::Sender<FlushOnDropTube>, 134 mut client_pipe_read: PipeConnection, 135 ) -> io::Result<()> { 136 loop { 137 info!("control server: started, waiting for clients."); 138 client_pipe_read.wait_for_client_connection_overlapped_blocking(&exit_evt)?; 139 140 let mut read_overlapped = OverlappedWrapper::new(true)?; 141 let control_send = client_worker.connect_client(&client_tube_send_channel)?; 142 client_waiting.signal()?; 143 info!("control server: accepted client"); 144 145 loop { 146 match base::deserialize_and_recv::<VmRequest, _>(|buf| { 147 client_pipe_read.read_overlapped_blocking( 148 buf, 149 &mut read_overlapped, 150 &exit_evt, 151 )?; 152 Ok(buf.len()) 153 }) { 154 Ok(msg) => { 155 control_send.send(&msg).map_err(|e| { 156 error!("unexpected error in control server recv loop: {}", e); 157 io::Error::new(io::ErrorKind::Other, e) 158 })?; 159 } 160 Err(TubeError::Disconnected) => break, 161 Err(e) => { 162 error!("unexpected error in control server recv loop: {}", e); 163 return Err(io::Error::new(io::ErrorKind::Other, e)); 164 } 165 }; 166 } 167 // Current client has disconnected. Now we can reuse the server pipe for a new client. 168 match client_pipe_read.disconnect_clients() { 169 Ok(()) => (), 170 // If the pipe is already broken/closed, we'll get an error about trying to close 171 // a pipe that has already been closed. Discard that error. 172 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => (), 173 Err(e) => return Err(e), 174 } 175 client_worker.stop_control_to_client_worker()?; 176 info!("control server: disconnected client"); 177 } 178 unreachable!("loop exits by returning an error"); 179 } 180 } 181 182 /// Handles connecting clients & forwarding data from client -> control server. 183 struct ClientWorker { 184 control_to_client_worker: Option<WorkerThread<(base::Result<()>, PipeConnection)>>, 185 client_pipe_write: Option<PipeConnection>, 186 } 187 188 impl ClientWorker { new(client_pipe_write: PipeConnection) -> Self189 fn new(client_pipe_write: PipeConnection) -> Self { 190 Self { 191 control_to_client_worker: None, 192 client_pipe_write: Some(client_pipe_write), 193 } 194 } 195 connect_client( &mut self, client_tube_send_channel: &mpsc::Sender<FlushOnDropTube>, ) -> base::Result<SendTube>196 fn connect_client( 197 &mut self, 198 client_tube_send_channel: &mpsc::Sender<FlushOnDropTube>, 199 ) -> base::Result<SendTube> { 200 // It is critical that the server end of the pipe is returned as the Tube in 201 // ControlServer::accept (tube_for_control_loop here). This way, we can ensure data is 202 // flushed before the pipe is dropped. In short, the order of Tubes returned by the pair 203 // matters. 204 let (tube_for_control_loop, tube_to_control_loop) = Tube::pair().map_err(|e| match e { 205 TubeError::Pair(io_err) => base::Error::from(io_err), 206 _ => base::Error::new(EIO), 207 })?; 208 209 let (control_send, control_recv) = 210 Tube::split_to_send_recv(tube_to_control_loop).map_err(|e| match e { 211 TubeError::Clone(io_err) => base::Error::from(io_err), 212 _ => base::Error::new(EIO), 213 })?; 214 215 let client_pipe_write = self.client_pipe_write.take().expect("loop already running"); 216 self.control_to_client_worker = Some(WorkerThread::start( 217 "ctrl_srv_client_to_ctrl", 218 move |exit_evt| { 219 let res = 220 Self::control_to_client_worker(exit_evt, &client_pipe_write, control_recv); 221 if let Err(e) = res.as_ref() { 222 error!("control_to_client_worker exited with error: {:?}", res); 223 } 224 (res, client_pipe_write) 225 }, 226 )); 227 client_tube_send_channel 228 .send(FlushOnDropTube::from(tube_for_control_loop)) 229 .expect("control server has gone away"); 230 Ok(control_send) 231 } 232 stop_control_to_client_worker(&mut self) -> base::Result<()>233 fn stop_control_to_client_worker(&mut self) -> base::Result<()> { 234 let (res, pipe) = self 235 .control_to_client_worker 236 .take() 237 .expect("loop must be running") 238 .stop(); 239 self.client_pipe_write = Some(pipe); 240 res 241 } 242 shutdown(self) -> base::Result<()>243 fn shutdown(self) -> base::Result<()> { 244 if let Some(worker) = self.control_to_client_worker { 245 worker.stop().0 246 } else { 247 Ok(()) 248 } 249 } 250 251 /// Worker that forwards data from the control loop -> client pipe. control_to_client_worker( exit_evt: Event, client_pipe_write: &PipeConnection, control_recv: RecvTube, ) -> base::Result<()>252 fn control_to_client_worker( 253 exit_evt: Event, 254 client_pipe_write: &PipeConnection, 255 control_recv: RecvTube, 256 ) -> base::Result<()> { 257 let wait_ctx = WaitContext::new()?; 258 wait_ctx.add(&exit_evt, Token::Exit)?; 259 wait_ctx.add(control_recv.get_read_notifier(), Token::Readable)?; 260 261 'poll: loop { 262 let events = wait_ctx.wait()?; 263 for event in events { 264 match event.token { 265 Token::Exit => { 266 break 'poll; 267 } 268 Token::Readable => { 269 let msg = match control_recv.recv::<VmResponse>() { 270 Ok(msg) => Ok(msg), 271 Err(TubeError::Disconnected) => { 272 return Ok(()); 273 } 274 Err(TubeError::Recv(e)) => Err(base::Error::from(e)), 275 Err(tube_error) => { 276 error!( 277 "unexpected error in control server recv loop: {}", 278 tube_error 279 ); 280 Err(base::Error::new(EIO)) 281 } 282 }?; 283 base::serialize_and_send(|buf| client_pipe_write.write(buf), &msg, None) 284 .map_err(|e| match e { 285 TubeError::Send(e) => base::Error::from(e), 286 tube_error => { 287 error!( 288 "unexpected error in control server recv loop: {}", 289 tube_error 290 ); 291 base::Error::new(EIO) 292 } 293 })?; 294 } 295 } 296 } 297 } 298 Ok(()) 299 } 300 } 301 302 #[cfg(test)] 303 mod tests { 304 use std::thread; 305 use std::time::Duration; 306 307 use base::PipeTube; 308 use rand::Rng; 309 310 use super::*; 311 generate_pipe_name() -> String312 fn generate_pipe_name() -> String { 313 format!( 314 r"\\.\pipe\test-ipc-pipe-name.rand{}", 315 rand::thread_rng().gen::<u64>(), 316 ) 317 } 318 319 #[track_caller] create_client(pipe_name: &str) -> PipeTube320 fn create_client(pipe_name: &str) -> PipeTube { 321 let mut last_error: Option<io::Error> = None; 322 for _ in 0..5 { 323 match named_pipes::create_client_pipe( 324 pipe_name, 325 &named_pipes::FramingMode::Message, 326 &named_pipes::BlockingMode::Wait, 327 /* overlapped= */ false, 328 ) { 329 Ok(pipe) => return PipeTube::from(pipe, None), 330 Err(e) => { 331 last_error = Some(e); 332 println!("failed client connection"); 333 thread::sleep(Duration::from_millis(100)) 334 } 335 } 336 } 337 panic!( 338 "failed to connect to control server: {:?}", 339 last_error.unwrap() 340 ) 341 } 342 343 #[test] test_smoke()344 fn test_smoke() { 345 // There are several threads, so run many iterations to exercise any possible race 346 // conditions. 347 for i in 0..100 { 348 println!("starting iteration {}", i); 349 let pipe_name = generate_pipe_name(); 350 351 let mut control_server = ControlServer::new(&pipe_name).unwrap(); 352 let fake_control_loop = base::thread::spawn_with_timeout(move || { 353 // First client. 354 { 355 println!("server: starting client 1"); 356 control_server.client_waiting().wait().unwrap(); 357 let client1 = control_server.accept(); 358 let req: VmRequest = client1.0.recv().unwrap(); 359 assert!(matches!(req, VmRequest::Powerbtn)); 360 client1.0.send(&VmResponse::Ok).unwrap(); 361 } 362 println!("server: finished client 1"); 363 364 // Second client. 365 { 366 println!("server: starting client 2"); 367 control_server.client_waiting().wait().unwrap(); 368 let client2 = control_server.accept(); 369 let req: VmRequest = client2.0.recv().unwrap(); 370 assert!(matches!(req, VmRequest::Exit)); 371 client2 372 .0 373 .send(&VmResponse::ErrString("err".to_owned())) 374 .unwrap(); 375 } 376 println!("server: finished client 2"); 377 control_server 378 }); 379 380 { 381 println!("client: starting client 1"); 382 let client1 = create_client(&pipe_name); 383 client1.send(&VmRequest::Powerbtn).unwrap(); 384 assert!(matches!(client1.recv().unwrap(), VmResponse::Ok)); 385 println!("client: finished client 1"); 386 } 387 388 { 389 println!("client: starting client 2"); 390 let client2 = create_client(&pipe_name); 391 client2.send(&VmRequest::Exit).unwrap(); 392 let resp = VmResponse::ErrString("err".to_owned()); 393 assert!(matches!(client2.recv::<VmResponse>().unwrap(), resp,)); 394 println!("client: finished client 2"); 395 } 396 397 let control_server = fake_control_loop.try_join(Duration::from_secs(2)).unwrap(); 398 control_server.shutdown().unwrap(); 399 println!("completed iteration {}", i); 400 } 401 } 402 } 403