1 // Copyright 2023 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 //! Implements the CrosVM control socket on Windows. Unlike on unix, this is a bit involved because 6 //! we can't process the raw named pipe in line inside `run_control` (named pipes aren't directly 7 //! waitable). In theory, AF_UNIX can be made waitable, but AF_UNIX is very slow, and we already 8 //! have significant prior art for using named pipes in a waitable fashion (`base::StreamChannel`). 9 10 use std::io; 11 use std::sync::mpsc; 12 use std::sync::Arc; 13 14 use base::named_pipes; 15 use base::named_pipes::OverlappedWrapper; 16 use base::named_pipes::PipeConnection; 17 use base::BlockingMode; 18 use base::Event; 19 use base::EventExt; 20 use base::EventToken; 21 use base::FlushOnDropTube; 22 use base::FramingMode; 23 use base::ReadNotifier; 24 use base::RecvTube; 25 use base::SendTube; 26 use base::StreamChannel; 27 use base::Tube; 28 use base::TubeError; 29 use base::WaitContext; 30 use base::WorkerThread; 31 use libc::EIO; 32 use log::error; 33 use log::info; 34 use log::warn; 35 use sync::Mutex; 36 use vm_control::VmRequest; 37 use vm_control::VmResponse; 38 39 /// Windows named pipes don't fit in well with the control loop (`run_control`) the way sockets do 40 /// on unix, so this struct provides a compatibility layer (named pipe server) that functions very 41 /// similarly to how a socket server would on unix. 42 /// 43 /// Terminology: 44 /// * The `ControlServer` is a socket server compatibility layer. 45 /// * The "control loop" is the VMM's main loop (`run_control`). It uses the `ControlServer` to 46 /// accept & service connections from clients that want to control the VMM (e.g. press the 47 /// power button, etc). 48 pub struct ControlServer { 49 server_listener_worker: WorkerThread<(io::Result<()>, ClientWorker)>, 50 /// Signaled when a client has connected and can be accepted without blocking. 51 client_waiting: Event, 52 /// Provides the accepted Tube every time a client connects. 53 client_tube_channel: mpsc::Receiver<FlushOnDropTube>, 54 } 55 56 #[derive(EventToken)] 57 enum Token { 58 Exit, 59 Readable, 60 } 61 62 impl ControlServer { 63 /// Creates a named pipe server on `pipe_name` that forwards Tube messages between the connected 64 /// client on that pipe, and the Tube returned by `ControlServer::accept`. new(pipe_name: &str) -> anyhow::Result<Self>65 pub fn new(pipe_name: &str) -> anyhow::Result<Self> { 66 let client_pipe_read = named_pipes::create_server_pipe( 67 pipe_name, 68 &named_pipes::FramingMode::Message, 69 &named_pipes::BlockingMode::Wait, 70 /* timeout= */ 0, 71 /* buffer_size= */ 1024 * 1024, 72 /* overlapped= */ true, 73 )?; 74 let client_pipe_write = client_pipe_read.try_clone()?; 75 let mut client_worker = ClientWorker::new(client_pipe_write); 76 let client_waiting = Event::new_auto_reset()?; 77 let client_waiting_for_worker = client_waiting.try_clone()?; 78 let (client_tube_channel_send, client_tube_channel_recv) = mpsc::channel(); 79 80 Ok(Self { 81 server_listener_worker: WorkerThread::start("ctrl_srv_listen_loop", move |exit_evt| { 82 let res = Self::server_listener_loop( 83 exit_evt, 84 &mut client_worker, 85 client_waiting_for_worker, 86 client_tube_channel_send, 87 client_pipe_read, 88 ); 89 if let Err(e) = res.as_ref() { 90 error!("server_listener_worker failed with error: {:?}", e) 91 } 92 (res, client_worker) 93 }), 94 client_waiting, 95 client_tube_channel: client_tube_channel_recv, 96 }) 97 } 98 99 /// Gets the client waiting event. If a client is waiting, [ControlServer::accept] can be called 100 /// and will return a [base::Tube] without blocking. client_waiting(&self) -> &Event101 pub fn client_waiting(&self) -> &Event { 102 &self.client_waiting 103 } 104 105 /// Accepts a connection (if one is waiting), returning a [base::Tube] connected to the client. 106 /// If [ControlServer::client_waiting] has not been signaled, this will block until a client 107 /// connects. accept(&mut self) -> FlushOnDropTube108 pub fn accept(&mut self) -> FlushOnDropTube { 109 self.client_tube_channel 110 .recv() 111 .expect("client worker has done away") 112 } 113 114 /// Shuts down the control server, disconnecting any connected clients. shutdown(self) -> base::Result<()>115 pub fn shutdown(self) -> base::Result<()> { 116 let (listen_res, client_worker) = self.server_listener_worker.stop(); 117 match listen_res { 118 Err(e) if e.kind() == io::ErrorKind::Interrupted => (), 119 Err(e) => return Err(base::Error::from(e)), 120 Ok(()) => (), 121 }; 122 client_worker.shutdown() 123 } 124 125 /// Listen loop for the control server. Handles waiting for new connections, creates the 126 /// forwarding thread for control loop -> client data, and forwards client -> control loop 127 /// data. server_listener_loop( exit_evt: Event, client_worker: &mut ClientWorker, client_waiting: Event, client_tube_send_channel: mpsc::Sender<FlushOnDropTube>, mut client_pipe_read: PipeConnection, ) -> io::Result<()>128 fn server_listener_loop( 129 exit_evt: Event, 130 client_worker: &mut ClientWorker, 131 client_waiting: Event, 132 client_tube_send_channel: mpsc::Sender<FlushOnDropTube>, 133 mut client_pipe_read: PipeConnection, 134 ) -> io::Result<()> { 135 loop { 136 info!("control server: started, waiting for clients."); 137 client_pipe_read.wait_for_client_connection_overlapped_blocking(&exit_evt)?; 138 139 let mut read_overlapped = OverlappedWrapper::new(true)?; 140 let control_send = client_worker.connect_client(&client_tube_send_channel)?; 141 client_waiting.signal()?; 142 info!("control server: accepted client"); 143 144 loop { 145 let recv_result = base::deserialize_and_recv::<VmRequest, _>(|buf| { 146 client_pipe_read.read_overlapped_blocking( 147 buf, 148 &mut read_overlapped, 149 &exit_evt, 150 )?; 151 Ok(buf.len()) 152 }); 153 154 match recv_result { 155 Ok(msg) => { 156 control_send.send(&msg).map_err(|e| { 157 error!("unexpected error in control server recv loop: {}", e); 158 io::Error::new(io::ErrorKind::Other, e) 159 })?; 160 } 161 Err(TubeError::Disconnected) => break, 162 Err(e) => { 163 error!("unexpected error in control server recv loop: {}", e); 164 return Err(io::Error::new(io::ErrorKind::Other, e)); 165 } 166 }; 167 } 168 // Current client has disconnected. Now we can reuse the server pipe for a new client. 169 match client_pipe_read.disconnect_clients() { 170 Ok(()) => (), 171 // If the pipe is already broken/closed, we'll get an error about trying to close 172 // a pipe that has already been closed. Discard that error. 173 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => (), 174 Err(e) => return Err(e), 175 } 176 client_worker.stop_control_to_client_worker()?; 177 info!("control server: disconnected client"); 178 } 179 unreachable!("loop exits by returning an error"); 180 } 181 } 182 183 /// Handles connecting clients & forwarding data from client -> control server. 184 struct ClientWorker { 185 control_to_client_worker: Option<WorkerThread<(base::Result<()>, PipeConnection)>>, 186 client_pipe_write: Option<PipeConnection>, 187 } 188 189 impl ClientWorker { new(client_pipe_write: PipeConnection) -> Self190 fn new(client_pipe_write: PipeConnection) -> Self { 191 Self { 192 control_to_client_worker: None, 193 client_pipe_write: Some(client_pipe_write), 194 } 195 } 196 connect_client( &mut self, client_tube_send_channel: &mpsc::Sender<FlushOnDropTube>, ) -> base::Result<SendTube>197 fn connect_client( 198 &mut self, 199 client_tube_send_channel: &mpsc::Sender<FlushOnDropTube>, 200 ) -> base::Result<SendTube> { 201 // It is critical that the server end of the pipe is returned as the Tube in 202 // ControlServer::accept (tube_for_control_loop here). This way, we can ensure data is 203 // flushed before the pipe is dropped. In short, the order of Tubes returned by the pair 204 // matters. 205 let (tube_for_control_loop, tube_to_control_loop) = Tube::pair().map_err(|e| match e { 206 TubeError::Pair(io_err) => base::Error::from(io_err), 207 _ => base::Error::new(EIO), 208 })?; 209 210 let (control_send, control_recv) = 211 Tube::split_to_send_recv(tube_to_control_loop).map_err(|e| match e { 212 TubeError::Clone(io_err) => base::Error::from(io_err), 213 _ => base::Error::new(EIO), 214 })?; 215 216 let client_pipe_write = self.client_pipe_write.take().expect("loop already running"); 217 self.control_to_client_worker = Some(WorkerThread::start( 218 "ctrl_srv_client_to_ctrl", 219 move |exit_evt| { 220 let res = 221 Self::control_to_client_worker(exit_evt, &client_pipe_write, control_recv); 222 if let Err(e) = res.as_ref() { 223 error!("control_to_client_worker exited with error: {:?}", res); 224 } 225 (res, client_pipe_write) 226 }, 227 )); 228 client_tube_send_channel 229 .send(FlushOnDropTube::from(tube_for_control_loop)) 230 .expect("control server has gone away"); 231 Ok(control_send) 232 } 233 stop_control_to_client_worker(&mut self) -> base::Result<()>234 fn stop_control_to_client_worker(&mut self) -> base::Result<()> { 235 let (res, pipe) = self 236 .control_to_client_worker 237 .take() 238 .expect("loop must be running") 239 .stop(); 240 self.client_pipe_write = Some(pipe); 241 res 242 } 243 shutdown(self) -> base::Result<()>244 fn shutdown(self) -> base::Result<()> { 245 if let Some(worker) = self.control_to_client_worker { 246 worker.stop().0 247 } else { 248 Ok(()) 249 } 250 } 251 252 /// Worker that forwards data from the control loop -> client pipe. control_to_client_worker( exit_evt: Event, client_pipe_write: &PipeConnection, control_recv: RecvTube, ) -> base::Result<()>253 fn control_to_client_worker( 254 exit_evt: Event, 255 client_pipe_write: &PipeConnection, 256 control_recv: RecvTube, 257 ) -> base::Result<()> { 258 let wait_ctx = WaitContext::new()?; 259 wait_ctx.add(&exit_evt, Token::Exit)?; 260 wait_ctx.add(control_recv.get_read_notifier(), Token::Readable)?; 261 262 'poll: loop { 263 let events = wait_ctx.wait()?; 264 for event in events { 265 match event.token { 266 Token::Exit => { 267 break 'poll; 268 } 269 Token::Readable => { 270 let msg = match control_recv.recv::<VmResponse>() { 271 Ok(msg) => Ok(msg), 272 Err(TubeError::Disconnected) => { 273 return Ok(()); 274 } 275 Err(TubeError::Recv(e)) => Err(base::Error::from(e)), 276 Err(tube_error) => { 277 error!( 278 "unexpected error in control server recv loop: {}", 279 tube_error 280 ); 281 Err(base::Error::new(EIO)) 282 } 283 }?; 284 base::serialize_and_send(|buf| client_pipe_write.write(buf), &msg, None) 285 .map_err(|e| match e { 286 TubeError::Send(e) => base::Error::from(e), 287 tube_error => { 288 error!( 289 "unexpected error in control server recv loop: {}", 290 tube_error 291 ); 292 base::Error::new(EIO) 293 } 294 })?; 295 } 296 } 297 } 298 } 299 Ok(()) 300 } 301 } 302 303 #[cfg(test)] 304 mod tests { 305 use std::thread; 306 use std::time::Duration; 307 308 use base::PipeTube; 309 use rand::Rng; 310 311 use super::*; 312 generate_pipe_name() -> String313 fn generate_pipe_name() -> String { 314 format!( 315 r"\\.\pipe\test-ipc-pipe-name.rand{}", 316 rand::thread_rng().gen::<u64>(), 317 ) 318 } 319 320 #[track_caller] create_client(pipe_name: &str) -> PipeTube321 fn create_client(pipe_name: &str) -> PipeTube { 322 let mut last_error: Option<io::Error> = None; 323 for _ in 0..5 { 324 match named_pipes::create_client_pipe( 325 pipe_name, 326 &named_pipes::FramingMode::Message, 327 &named_pipes::BlockingMode::Wait, 328 /* overlapped= */ false, 329 ) { 330 Ok(pipe) => return PipeTube::from(pipe, None), 331 Err(e) => { 332 last_error = Some(e); 333 println!("failed client connection"); 334 thread::sleep(Duration::from_millis(100)) 335 } 336 } 337 } 338 panic!( 339 "failed to connect to control server: {:?}", 340 last_error.unwrap() 341 ) 342 } 343 344 #[test] test_smoke()345 fn test_smoke() { 346 // There are several threads, so run many iterations to exercise any possible race 347 // conditions. 348 for i in 0..100 { 349 println!("starting iteration {}", i); 350 let pipe_name = generate_pipe_name(); 351 352 let mut control_server = ControlServer::new(&pipe_name).unwrap(); 353 let fake_control_loop = base::thread::spawn_with_timeout(move || { 354 // First client. 355 { 356 println!("server: starting client 1"); 357 control_server.client_waiting().wait().unwrap(); 358 println!("server: woke on client 1"); 359 let client1 = control_server.accept(); 360 println!("server: accepted client 1"); 361 let req: VmRequest = client1.0.recv().unwrap(); 362 println!("server: got req from client 1"); 363 assert!(matches!(req, VmRequest::Powerbtn)); 364 client1.0.send(&VmResponse::Ok).unwrap(); 365 } 366 println!("server: finished client 1"); 367 368 // Second client. 369 { 370 println!("server: starting client 2"); 371 control_server.client_waiting().wait().unwrap(); 372 println!("server: woke on client 2"); 373 let client2 = control_server.accept(); 374 println!("server: accepted client 2"); 375 let req: VmRequest = client2.0.recv().unwrap(); 376 println!("server: got req from client 2"); 377 assert!(matches!(req, VmRequest::Exit)); 378 client2 379 .0 380 .send(&VmResponse::ErrString("err".to_owned())) 381 .unwrap(); 382 } 383 println!("server: finished client 2"); 384 control_server 385 }); 386 387 { 388 println!("client: starting client 1"); 389 let client1 = create_client(&pipe_name); 390 client1.send(&VmRequest::Powerbtn).unwrap(); 391 println!("client: sent client 1 request"); 392 assert!(matches!(client1.recv().unwrap(), VmResponse::Ok)); 393 println!("client: finished client 1"); 394 } 395 396 { 397 println!("client: starting client 2"); 398 let client2 = create_client(&pipe_name); 399 client2.send(&VmRequest::Exit).unwrap(); 400 println!("client: sent client 2 request"); 401 let resp = VmResponse::ErrString("err".to_owned()); 402 assert!(matches!(client2.recv::<VmResponse>().unwrap(), resp,)); 403 println!("client: finished client 2"); 404 } 405 406 let control_server = fake_control_loop.try_join(Duration::from_secs(2)).unwrap(); 407 control_server.shutdown().unwrap(); 408 println!("completed iteration {}", i); 409 } 410 } 411 } 412