• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Implements the CrosVM control socket on Windows. Unlike on unix, this is a bit involved because
6 //! we can't process the raw named pipe in line inside `run_control` (named pipes aren't directly
7 //! waitable). In theory, AF_UNIX can be made waitable, but AF_UNIX is very slow, and we already
8 //! have significant prior art for using named pipes in a waitable fashion (`base::StreamChannel`).
9 
10 use std::io;
11 use std::sync::mpsc;
12 use std::sync::Arc;
13 
14 use base::named_pipes;
15 use base::named_pipes::OverlappedWrapper;
16 use base::named_pipes::PipeConnection;
17 use base::BlockingMode;
18 use base::Event;
19 use base::EventExt;
20 use base::EventToken;
21 use base::FlushOnDropTube;
22 use base::FramingMode;
23 use base::ReadNotifier;
24 use base::RecvTube;
25 use base::SendTube;
26 use base::StreamChannel;
27 use base::Tube;
28 use base::TubeError;
29 use base::WaitContext;
30 use base::WorkerThread;
31 use libc::EIO;
32 use log::error;
33 use log::info;
34 use log::warn;
35 use sync::Mutex;
36 use vm_control::VmRequest;
37 use vm_control::VmResponse;
38 
39 /// Windows named pipes don't fit in well with the control loop (`run_control`) the way sockets do
40 /// on unix, so this struct provides a compatibility layer (named pipe server) that functions very
41 /// similarly to how a socket server would on unix.
42 ///
43 /// Terminology:
44 ///     * The `ControlServer` is a socket server compatibility layer.
45 ///     * The "control loop" is the VMM's main loop (`run_control`). It uses the `ControlServer` to
46 ///       accept & service connections from clients that want to control the VMM (e.g. press the
47 ///       power button, etc).
48 pub struct ControlServer {
49     server_listener_worker: WorkerThread<(io::Result<()>, ClientWorker)>,
50     /// Signaled when a client has connected and can be accepted without blocking.
51     client_waiting: Event,
52     /// Provides the accepted Tube every time a client connects.
53     client_tube_channel: mpsc::Receiver<FlushOnDropTube>,
54 }
55 
56 #[derive(EventToken)]
57 enum Token {
58     Exit,
59     Readable,
60 }
61 
62 impl ControlServer {
63     /// Creates a named pipe server on `pipe_name` that forwards Tube messages between the connected
64     /// client on that pipe, and the Tube returned by `ControlServer::accept`.
new(pipe_name: &str) -> anyhow::Result<Self>65     pub fn new(pipe_name: &str) -> anyhow::Result<Self> {
66         let client_pipe_read = named_pipes::create_server_pipe(
67             pipe_name,
68             &named_pipes::FramingMode::Message,
69             &named_pipes::BlockingMode::Wait,
70             /* timeout= */ 0,
71             /* buffer_size= */ 1024 * 1024,
72             /* overlapped= */ true,
73         )?;
74         let client_pipe_write = client_pipe_read.try_clone()?;
75         let mut client_worker = ClientWorker::new(client_pipe_write);
76         let client_waiting = Event::new_auto_reset()?;
77         let client_waiting_for_worker = client_waiting.try_clone()?;
78         let (client_tube_channel_send, client_tube_channel_recv) = mpsc::channel();
79 
80         Ok(Self {
81             server_listener_worker: WorkerThread::start("ctrl_srv_listen_loop", move |exit_evt| {
82                 let res = Self::server_listener_loop(
83                     exit_evt,
84                     &mut client_worker,
85                     client_waiting_for_worker,
86                     client_tube_channel_send,
87                     client_pipe_read,
88                 );
89                 if let Err(e) = res.as_ref() {
90                     error!("server_listener_worker failed with error: {:?}", e)
91                 }
92                 (res, client_worker)
93             }),
94             client_waiting,
95             client_tube_channel: client_tube_channel_recv,
96         })
97     }
98 
99     /// Gets the client waiting event. If a client is waiting, [ControlServer::accept] can be called
100     /// and will return a [base::Tube] without blocking.
client_waiting(&self) -> &Event101     pub fn client_waiting(&self) -> &Event {
102         &self.client_waiting
103     }
104 
105     /// Accepts a connection (if one is waiting), returning a [base::Tube] connected to the client.
106     /// If [ControlServer::client_waiting] has not been signaled, this will block until a client
107     /// connects.
accept(&mut self) -> FlushOnDropTube108     pub fn accept(&mut self) -> FlushOnDropTube {
109         self.client_tube_channel
110             .recv()
111             .expect("client worker has done away")
112     }
113 
114     /// Shuts down the control server, disconnecting any connected clients.
shutdown(self) -> base::Result<()>115     pub fn shutdown(self) -> base::Result<()> {
116         let (listen_res, client_worker) = self.server_listener_worker.stop();
117         match listen_res {
118             Err(e) if e.kind() == io::ErrorKind::Interrupted => (),
119             Err(e) => return Err(base::Error::from(e)),
120             Ok(()) => (),
121         };
122         client_worker.shutdown()
123     }
124 
125     /// Listen loop for the control server. Handles waiting for new connections, creates the
126     /// forwarding thread for control loop -> client data, and forwards client -> control loop
127     /// data.
server_listener_loop( exit_evt: Event, client_worker: &mut ClientWorker, client_waiting: Event, client_tube_send_channel: mpsc::Sender<FlushOnDropTube>, mut client_pipe_read: PipeConnection, ) -> io::Result<()>128     fn server_listener_loop(
129         exit_evt: Event,
130         client_worker: &mut ClientWorker,
131         client_waiting: Event,
132         client_tube_send_channel: mpsc::Sender<FlushOnDropTube>,
133         mut client_pipe_read: PipeConnection,
134     ) -> io::Result<()> {
135         loop {
136             info!("control server: started, waiting for clients.");
137             client_pipe_read.wait_for_client_connection_overlapped_blocking(&exit_evt)?;
138 
139             let mut read_overlapped = OverlappedWrapper::new(true)?;
140             let control_send = client_worker.connect_client(&client_tube_send_channel)?;
141             client_waiting.signal()?;
142             info!("control server: accepted client");
143 
144             loop {
145                 let recv_result = base::deserialize_and_recv::<VmRequest, _>(|buf| {
146                     client_pipe_read.read_overlapped_blocking(
147                         buf,
148                         &mut read_overlapped,
149                         &exit_evt,
150                     )?;
151                     Ok(buf.len())
152                 });
153 
154                 match recv_result {
155                     Ok(msg) => {
156                         control_send.send(&msg).map_err(|e| {
157                             error!("unexpected error in control server recv loop: {}", e);
158                             io::Error::new(io::ErrorKind::Other, e)
159                         })?;
160                     }
161                     Err(TubeError::Disconnected) => break,
162                     Err(e) => {
163                         error!("unexpected error in control server recv loop: {}", e);
164                         return Err(io::Error::new(io::ErrorKind::Other, e));
165                     }
166                 };
167             }
168             // Current client has disconnected. Now we can reuse the server pipe for a new client.
169             match client_pipe_read.disconnect_clients() {
170                 Ok(()) => (),
171                 // If the pipe is already broken/closed, we'll get an error about trying to close
172                 // a pipe that has already been closed. Discard that error.
173                 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => (),
174                 Err(e) => return Err(e),
175             }
176             client_worker.stop_control_to_client_worker()?;
177             info!("control server: disconnected client");
178         }
179         unreachable!("loop exits by returning an error");
180     }
181 }
182 
183 /// Handles connecting clients & forwarding data from client -> control server.
184 struct ClientWorker {
185     control_to_client_worker: Option<WorkerThread<(base::Result<()>, PipeConnection)>>,
186     client_pipe_write: Option<PipeConnection>,
187 }
188 
189 impl ClientWorker {
new(client_pipe_write: PipeConnection) -> Self190     fn new(client_pipe_write: PipeConnection) -> Self {
191         Self {
192             control_to_client_worker: None,
193             client_pipe_write: Some(client_pipe_write),
194         }
195     }
196 
connect_client( &mut self, client_tube_send_channel: &mpsc::Sender<FlushOnDropTube>, ) -> base::Result<SendTube>197     fn connect_client(
198         &mut self,
199         client_tube_send_channel: &mpsc::Sender<FlushOnDropTube>,
200     ) -> base::Result<SendTube> {
201         // It is critical that the server end of the pipe is returned as the Tube in
202         // ControlServer::accept (tube_for_control_loop here). This way, we can ensure data is
203         // flushed before the pipe is dropped. In short, the order of Tubes returned by the pair
204         // matters.
205         let (tube_for_control_loop, tube_to_control_loop) = Tube::pair().map_err(|e| match e {
206             TubeError::Pair(io_err) => base::Error::from(io_err),
207             _ => base::Error::new(EIO),
208         })?;
209 
210         let (control_send, control_recv) =
211             Tube::split_to_send_recv(tube_to_control_loop).map_err(|e| match e {
212                 TubeError::Clone(io_err) => base::Error::from(io_err),
213                 _ => base::Error::new(EIO),
214             })?;
215 
216         let client_pipe_write = self.client_pipe_write.take().expect("loop already running");
217         self.control_to_client_worker = Some(WorkerThread::start(
218             "ctrl_srv_client_to_ctrl",
219             move |exit_evt| {
220                 let res =
221                     Self::control_to_client_worker(exit_evt, &client_pipe_write, control_recv);
222                 if let Err(e) = res.as_ref() {
223                     error!("control_to_client_worker exited with error: {:?}", res);
224                 }
225                 (res, client_pipe_write)
226             },
227         ));
228         client_tube_send_channel
229             .send(FlushOnDropTube::from(tube_for_control_loop))
230             .expect("control server has gone away");
231         Ok(control_send)
232     }
233 
stop_control_to_client_worker(&mut self) -> base::Result<()>234     fn stop_control_to_client_worker(&mut self) -> base::Result<()> {
235         let (res, pipe) = self
236             .control_to_client_worker
237             .take()
238             .expect("loop must be running")
239             .stop();
240         self.client_pipe_write = Some(pipe);
241         res
242     }
243 
shutdown(self) -> base::Result<()>244     fn shutdown(self) -> base::Result<()> {
245         if let Some(worker) = self.control_to_client_worker {
246             worker.stop().0
247         } else {
248             Ok(())
249         }
250     }
251 
252     /// Worker that forwards data from the control loop -> client pipe.
control_to_client_worker( exit_evt: Event, client_pipe_write: &PipeConnection, control_recv: RecvTube, ) -> base::Result<()>253     fn control_to_client_worker(
254         exit_evt: Event,
255         client_pipe_write: &PipeConnection,
256         control_recv: RecvTube,
257     ) -> base::Result<()> {
258         let wait_ctx = WaitContext::new()?;
259         wait_ctx.add(&exit_evt, Token::Exit)?;
260         wait_ctx.add(control_recv.get_read_notifier(), Token::Readable)?;
261 
262         'poll: loop {
263             let events = wait_ctx.wait()?;
264             for event in events {
265                 match event.token {
266                     Token::Exit => {
267                         break 'poll;
268                     }
269                     Token::Readable => {
270                         let msg = match control_recv.recv::<VmResponse>() {
271                             Ok(msg) => Ok(msg),
272                             Err(TubeError::Disconnected) => {
273                                 return Ok(());
274                             }
275                             Err(TubeError::Recv(e)) => Err(base::Error::from(e)),
276                             Err(tube_error) => {
277                                 error!(
278                                     "unexpected error in control server recv loop: {}",
279                                     tube_error
280                                 );
281                                 Err(base::Error::new(EIO))
282                             }
283                         }?;
284                         base::serialize_and_send(|buf| client_pipe_write.write(buf), &msg, None)
285                             .map_err(|e| match e {
286                                 TubeError::Send(e) => base::Error::from(e),
287                                 tube_error => {
288                                     error!(
289                                         "unexpected error in control server recv loop: {}",
290                                         tube_error
291                                     );
292                                     base::Error::new(EIO)
293                                 }
294                             })?;
295                     }
296                 }
297             }
298         }
299         Ok(())
300     }
301 }
302 
303 #[cfg(test)]
304 mod tests {
305     use std::thread;
306     use std::time::Duration;
307 
308     use base::PipeTube;
309     use rand::Rng;
310 
311     use super::*;
312 
generate_pipe_name() -> String313     fn generate_pipe_name() -> String {
314         format!(
315             r"\\.\pipe\test-ipc-pipe-name.rand{}",
316             rand::thread_rng().gen::<u64>(),
317         )
318     }
319 
320     #[track_caller]
create_client(pipe_name: &str) -> PipeTube321     fn create_client(pipe_name: &str) -> PipeTube {
322         let mut last_error: Option<io::Error> = None;
323         for _ in 0..5 {
324             match named_pipes::create_client_pipe(
325                 pipe_name,
326                 &named_pipes::FramingMode::Message,
327                 &named_pipes::BlockingMode::Wait,
328                 /* overlapped= */ false,
329             ) {
330                 Ok(pipe) => return PipeTube::from(pipe, None),
331                 Err(e) => {
332                     last_error = Some(e);
333                     println!("failed client connection");
334                     thread::sleep(Duration::from_millis(100))
335                 }
336             }
337         }
338         panic!(
339             "failed to connect to control server: {:?}",
340             last_error.unwrap()
341         )
342     }
343 
344     #[test]
test_smoke()345     fn test_smoke() {
346         // There are several threads, so run many iterations to exercise any possible race
347         // conditions.
348         for i in 0..100 {
349             println!("starting iteration {}", i);
350             let pipe_name = generate_pipe_name();
351 
352             let mut control_server = ControlServer::new(&pipe_name).unwrap();
353             let fake_control_loop = base::thread::spawn_with_timeout(move || {
354                 // First client.
355                 {
356                     println!("server: starting client 1");
357                     control_server.client_waiting().wait().unwrap();
358                     println!("server: woke on client 1");
359                     let client1 = control_server.accept();
360                     println!("server: accepted client 1");
361                     let req: VmRequest = client1.0.recv().unwrap();
362                     println!("server: got req from client 1");
363                     assert!(matches!(req, VmRequest::Powerbtn));
364                     client1.0.send(&VmResponse::Ok).unwrap();
365                 }
366                 println!("server: finished client 1");
367 
368                 // Second client.
369                 {
370                     println!("server: starting client 2");
371                     control_server.client_waiting().wait().unwrap();
372                     println!("server: woke on client 2");
373                     let client2 = control_server.accept();
374                     println!("server: accepted client 2");
375                     let req: VmRequest = client2.0.recv().unwrap();
376                     println!("server: got req from client 2");
377                     assert!(matches!(req, VmRequest::Exit));
378                     client2
379                         .0
380                         .send(&VmResponse::ErrString("err".to_owned()))
381                         .unwrap();
382                 }
383                 println!("server: finished client 2");
384                 control_server
385             });
386 
387             {
388                 println!("client: starting client 1");
389                 let client1 = create_client(&pipe_name);
390                 client1.send(&VmRequest::Powerbtn).unwrap();
391                 println!("client: sent client 1 request");
392                 assert!(matches!(client1.recv().unwrap(), VmResponse::Ok));
393                 println!("client: finished client 1");
394             }
395 
396             {
397                 println!("client: starting client 2");
398                 let client2 = create_client(&pipe_name);
399                 client2.send(&VmRequest::Exit).unwrap();
400                 println!("client: sent client 2 request");
401                 let resp = VmResponse::ErrString("err".to_owned());
402                 assert!(matches!(client2.recv::<VmResponse>().unwrap(), resp,));
403                 println!("client: finished client 2");
404             }
405 
406             let control_server = fake_control_loop.try_join(Duration::from_secs(2)).unwrap();
407             control_server.shutdown().unwrap();
408             println!("completed iteration {}", i);
409         }
410     }
411 }
412