• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! A high-level manager for hotplug PCI devices.
6 
7 // TODO(b/243767476): Support aarch64.
8 use std::cmp::Ordering;
9 use std::collections::BTreeMap;
10 use std::collections::HashMap;
11 use std::collections::VecDeque;
12 use std::sync::mpsc;
13 use std::sync::Arc;
14 
15 use anyhow::anyhow;
16 use anyhow::bail;
17 use anyhow::Context;
18 use anyhow::Error;
19 use arch::RunnableLinuxVm;
20 use arch::VcpuArch;
21 use arch::VmArch;
22 use base::AsRawDescriptor;
23 use base::Event;
24 use base::EventToken;
25 use base::RawDescriptor;
26 use base::WaitContext;
27 use base::WorkerThread;
28 use devices::BusDevice;
29 use devices::HotPlugBus;
30 use devices::HotPlugKey;
31 use devices::IrqEventSource;
32 use devices::IrqLevelEvent;
33 use devices::PciAddress;
34 use devices::PciInterruptPin;
35 use devices::PciRootCommand;
36 use devices::ResourceCarrier;
37 use log::error;
38 use resources::SystemAllocator;
39 #[cfg(feature = "swap")]
40 use swap::SwapDeviceHelper;
41 use sync::Mutex;
42 use vm_memory::GuestMemory;
43 
44 use crate::crosvm::sys::linux::JailWarden;
45 use crate::crosvm::sys::linux::JailWardenImpl;
46 use crate::crosvm::sys::linux::PermissiveJailWarden;
47 use crate::Config;
48 
49 pub type Result<T> = std::result::Result<T, Error>;
50 
51 /// PciHotPlugManager manages hotplug ports, and handles PCI device hot plug and hot removal.
52 pub struct PciHotPlugManager {
53     /// map of ports managed
54     port_stubs: BTreeMap<PciAddress, PortManagerStub>,
55     /// map of downstream bus to upstream PCI address
56     bus_address_map: BTreeMap<u8, PciAddress>,
57     /// JailWarden for jailing hotplug devices
58     jail_warden: Box<dyn JailWarden>,
59     /// Client on Manager side of PciHotPlugWorker
60     worker_client: Option<WorkerClient>,
61 }
62 
63 /// WorkerClient is a wrapper of the worker methods.
64 struct WorkerClient {
65     /// event to signal control command is sent
66     control_evt: Event,
67     /// control channel to worker
68     command_sender: mpsc::Sender<WorkerCommand>,
69     /// response channel from worker
70     response_receiver: mpsc::Receiver<WorkerResponse>,
71     _worker_thread: WorkerThread<Result<()>>,
72 }
73 
74 impl WorkerClient {
75     /// Constructs PciHotPlugWorker with its client.
new(rootbus_controller: mpsc::Sender<PciRootCommand>) -> Result<Self>76     fn new(rootbus_controller: mpsc::Sender<PciRootCommand>) -> Result<Self> {
77         let (command_sender, command_receiver) = mpsc::channel();
78         let (response_sender, response_receiver) = mpsc::channel();
79         let control_evt = Event::new()?;
80         let control_evt_cpy = control_evt.try_clone()?;
81         let worker_thread = WorkerThread::start("pcihp_mgr_workr", move |kill_evt| {
82             let mut worker = PciHotPlugWorker::new(
83                 rootbus_controller,
84                 command_receiver,
85                 response_sender,
86                 control_evt_cpy,
87                 &kill_evt,
88             )?;
89             worker.run(kill_evt).inspect_err(|e| {
90                 error!("Worker exited with error: {:?}", e);
91             })
92         });
93         Ok(WorkerClient {
94             control_evt,
95             command_sender,
96             response_receiver,
97             _worker_thread: worker_thread,
98         })
99     }
100 
101     /// Sends worker command, and wait for its response.
send_worker_command(&self, command: WorkerCommand) -> Result<WorkerResponse>102     fn send_worker_command(&self, command: WorkerCommand) -> Result<WorkerResponse> {
103         self.command_sender.send(command)?;
104         self.control_evt.signal()?;
105         Ok(self.response_receiver.recv()?)
106     }
107 }
108 
109 /// PortManagerStub is the manager-side copy of a port.
110 struct PortManagerStub {
111     /// index of downstream bus
112     downstream_bus: u8,
113     /// Map of hotplugged devices, and system resources that can be released when device is
114     /// removed.
115     devices: HashMap<PciAddress, RecoverableResource>,
116 }
117 
118 /// System resources that can be released when a hotplugged device is removed.
119 struct RecoverableResource {
120     irq_num: u32,
121     irq_evt: IrqLevelEvent,
122 }
123 
124 /// Control commands to worker.
125 enum WorkerCommand {
126     /// Add port to the worker.
127     AddPort(PciAddress, PortWorkerStub),
128     /// Get the state of the port.
129     GetPortState(PciAddress),
130     /// Get an empty port for hotplug. Returns the least port sorted by PortKey.
131     GetEmptyPort,
132     /// Signals hot plug on port. Changes an empty port to occupied.
133     SignalHotPlug(SignalHotPlugCommand),
134     /// Signals hot unplug on port. Changes an occupied port to empty.
135     SignalHotUnplug(PciAddress),
136 }
137 
138 #[derive(Clone)]
139 struct GuestDeviceStub {
140     pci_addr: PciAddress,
141     key: HotPlugKey,
142     device: Arc<Mutex<dyn BusDevice>>,
143 }
144 
145 #[derive(Clone)]
146 struct SignalHotPlugCommand {
147     /// the upstream address of hotplug port
148     upstream_address: PciAddress,
149     /// the array of guest devices on the port
150     guest_devices: Vec<GuestDeviceStub>,
151 }
152 
153 impl SignalHotPlugCommand {
new(upstream_address: PciAddress, guest_devices: Vec<GuestDeviceStub>) -> Result<Self>154     fn new(upstream_address: PciAddress, guest_devices: Vec<GuestDeviceStub>) -> Result<Self> {
155         if guest_devices.is_empty() {
156             bail!("No guest devices");
157         }
158         Ok(Self {
159             upstream_address,
160             guest_devices,
161         })
162     }
163 }
164 
165 /// PortWorkerStub is the worker-side copy of a port.
166 #[derive(Clone)]
167 struct PortWorkerStub {
168     /// The downstream base address of the port. Needed to send plug and unplug signal.
169     base_address: PciAddress,
170     /// Currently attached devices that should be removed.
171     attached_devices: Vec<PciAddress>,
172     /// Devices to be added each time send_hot_plug_signal is called.
173     devices_to_add: VecDeque<Vec<GuestDeviceStub>>,
174     /// hotplug port
175     port: Arc<Mutex<dyn HotPlugBus>>,
176 }
177 
178 impl PortWorkerStub {
new(port: Arc<Mutex<dyn HotPlugBus>>, downstream_bus: u8) -> Result<Self>179     fn new(port: Arc<Mutex<dyn HotPlugBus>>, downstream_bus: u8) -> Result<Self> {
180         let base_address = PciAddress::new(0, downstream_bus.into(), 0, 0)?;
181         Ok(Self {
182             base_address,
183             devices_to_add: VecDeque::new(),
184             attached_devices: Vec::new(),
185             port,
186         })
187     }
188 
add_hotplug_devices(&mut self, devices: Vec<GuestDeviceStub>) -> Result<()>189     fn add_hotplug_devices(&mut self, devices: Vec<GuestDeviceStub>) -> Result<()> {
190         if devices.is_empty() {
191             bail!("No guest devices");
192         }
193         self.devices_to_add.push_back(devices);
194         Ok(())
195     }
196 
cancel_queued_add(&mut self) -> Result<()>197     fn cancel_queued_add(&mut self) -> Result<()> {
198         self.devices_to_add
199             .pop_back()
200             .context("No guest device add queued")?;
201         Ok(())
202     }
203 
send_hot_plug_signal( &mut self, rootbus_controller: &mpsc::Sender<PciRootCommand>, ) -> Result<Event>204     fn send_hot_plug_signal(
205         &mut self,
206         rootbus_controller: &mpsc::Sender<PciRootCommand>,
207     ) -> Result<Event> {
208         let mut port_lock = self.port.lock();
209         let devices = self
210             .devices_to_add
211             .pop_front()
212             .context("Missing devices to add")?;
213         for device in devices {
214             rootbus_controller.send(PciRootCommand::Add(device.pci_addr, device.device))?;
215             self.attached_devices.push(device.pci_addr);
216             port_lock.add_hotplug_device(device.key, device.pci_addr);
217         }
218         port_lock
219             .hot_plug(self.base_address)?
220             .context("hotplug bus does not support command complete notification")
221     }
222 
send_hot_unplug_signal( &mut self, rootbus_controller: &mpsc::Sender<PciRootCommand>, ) -> Result<Event>223     fn send_hot_unplug_signal(
224         &mut self,
225         rootbus_controller: &mpsc::Sender<PciRootCommand>,
226     ) -> Result<Event> {
227         for pci_addr in self.attached_devices.drain(..) {
228             rootbus_controller.send(PciRootCommand::Remove(pci_addr))?;
229         }
230         self.port
231             .lock()
232             .hot_unplug(self.base_address)?
233             .context("hotplug bus does not support command complete notification")
234     }
235 }
236 
237 /// Control response from worker.
238 #[derive(Debug)]
239 enum WorkerResponse {
240     /// AddPort success.
241     AddPortOk,
242     /// GetEmptyPort success, use port at PciAddress.
243     GetEmptyPortOk(PciAddress),
244     /// GetPortState success. The "steps behind" field shall be considered expired, and the guest
245     /// is "less than or equal to" n steps behind.
246     GetPortStateOk(PortState),
247     /// SignalHotPlug or SignalHotUnplug success.
248     SignalOk,
249     /// Command fail because it is not valid.
250     InvalidCommand(Error),
251 }
252 
253 impl PartialEq for WorkerResponse {
eq(&self, other: &Self) -> bool254     fn eq(&self, other: &Self) -> bool {
255         match (self, other) {
256             (Self::GetEmptyPortOk(l0), Self::GetEmptyPortOk(r0)) => l0 == r0,
257             (Self::GetPortStateOk(l0), Self::GetPortStateOk(r0)) => l0 == r0,
258             (Self::InvalidCommand(_), Self::InvalidCommand(_)) => true,
259             _ => core::mem::discriminant(self) == core::mem::discriminant(other),
260         }
261     }
262 }
263 
264 #[derive(Debug, EventToken)]
265 enum Token {
266     Kill,
267     ManagerCommand,
268     PortReady(RawDescriptor),
269     PlugComplete(RawDescriptor),
270     UnplugComplete(RawDescriptor),
271 }
272 
273 /// PciHotPlugWorker is a worker that handles the asynchrony of slot states between crosvm and the
274 /// guest OS. It is responsible for scheduling the PCIe slot control signals and handle its result.
275 struct PciHotPlugWorker {
276     event_map: BTreeMap<RawDescriptor, (Event, PciAddress)>,
277     port_state_map: BTreeMap<PciAddress, PortState>,
278     port_map: BTreeMap<PortKey, PortWorkerStub>,
279     manager_evt: Event,
280     wait_ctx: WaitContext<Token>,
281     command_receiver: mpsc::Receiver<WorkerCommand>,
282     response_sender: mpsc::Sender<WorkerResponse>,
283     rootbus_controller: mpsc::Sender<PciRootCommand>,
284 }
285 
286 impl PciHotPlugWorker {
new( rootbus_controller: mpsc::Sender<PciRootCommand>, command_receiver: mpsc::Receiver<WorkerCommand>, response_sender: mpsc::Sender<WorkerResponse>, manager_evt: Event, kill_evt: &Event, ) -> Result<Self>287     fn new(
288         rootbus_controller: mpsc::Sender<PciRootCommand>,
289         command_receiver: mpsc::Receiver<WorkerCommand>,
290         response_sender: mpsc::Sender<WorkerResponse>,
291         manager_evt: Event,
292         kill_evt: &Event,
293     ) -> Result<Self> {
294         let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
295             (&manager_evt, Token::ManagerCommand),
296             (kill_evt, Token::Kill),
297         ])?;
298         Ok(Self {
299             event_map: BTreeMap::new(),
300             port_state_map: BTreeMap::new(),
301             port_map: BTreeMap::new(),
302             manager_evt,
303             wait_ctx,
304             command_receiver,
305             response_sender,
306             rootbus_controller,
307         })
308     }
309 
310     /// Starts the worker. Runs until received kill request, or an error that the worker is in an
311     /// invalid state.
run(&mut self, kill_evt: Event) -> Result<()>312     fn run(&mut self, kill_evt: Event) -> Result<()> {
313         'wait: loop {
314             let events = self.wait_ctx.wait()?;
315             for triggered_event in events.iter().filter(|e| e.is_readable) {
316                 match triggered_event.token {
317                     Token::ManagerCommand => {
318                         self.manager_evt.wait()?;
319                         self.handle_manager_command()?;
320                     }
321                     Token::PortReady(descriptor) => {
322                         let (event, pci_address) = self
323                             .event_map
324                             .remove(&descriptor)
325                             .context("Cannot find event")?;
326                         event.wait()?;
327                         self.wait_ctx.delete(&event)?;
328                         self.handle_port_ready(pci_address)?;
329                     }
330                     Token::PlugComplete(descriptor) => {
331                         let (event, pci_address) = self
332                             .event_map
333                             .remove(&descriptor)
334                             .context("Cannot find event")?;
335                         event.wait()?;
336                         self.wait_ctx.delete(&event)?;
337                         self.handle_plug_complete(pci_address)?;
338                     }
339                     Token::UnplugComplete(descriptor) => {
340                         let (event, pci_address) = self
341                             .event_map
342                             .remove(&descriptor)
343                             .context("Cannot find event")?;
344                         self.wait_ctx.delete(&event)?;
345                         self.handle_unplug_complete(pci_address)?;
346                     }
347                     Token::Kill => {
348                         let _ = kill_evt.wait();
349                         break 'wait;
350                     }
351                 }
352             }
353         }
354         Ok(())
355     }
356 
handle_manager_command(&mut self) -> Result<()>357     fn handle_manager_command(&mut self) -> Result<()> {
358         let response = match self.command_receiver.recv()? {
359             WorkerCommand::AddPort(pci_address, port) => self.handle_add_port(pci_address, port),
360             WorkerCommand::GetPortState(pci_address) => self.handle_get_port_state(pci_address),
361             WorkerCommand::GetEmptyPort => self.handle_get_empty_port(),
362             WorkerCommand::SignalHotPlug(hotplug_command) => {
363                 self.handle_plug_request(hotplug_command)
364             }
365             WorkerCommand::SignalHotUnplug(pci_address) => self.handle_unplug_request(pci_address),
366         }?;
367         Ok(self.response_sender.send(response)?)
368     }
369 
370     /// Handles add port: Initiate port in EmptyNotReady state.
handle_add_port( &mut self, pci_address: PciAddress, port: PortWorkerStub, ) -> Result<WorkerResponse>371     fn handle_add_port(
372         &mut self,
373         pci_address: PciAddress,
374         port: PortWorkerStub,
375     ) -> Result<WorkerResponse> {
376         if self.port_state_map.contains_key(&pci_address) {
377             return Ok(WorkerResponse::InvalidCommand(anyhow!(
378                 "Conflicting upstream PCI address"
379             )));
380         }
381         let port_state = PortState::EmptyNotReady;
382         let port_ready_event = port.port.lock().get_ready_notification()?;
383         self.wait_ctx.add(
384             &port_ready_event,
385             Token::PortReady(port_ready_event.as_raw_descriptor()),
386         )?;
387         self.event_map.insert(
388             port_ready_event.as_raw_descriptor(),
389             (port_ready_event, pci_address),
390         );
391         self.port_state_map.insert(pci_address, port_state);
392         self.port_map.insert(
393             PortKey {
394                 port_state,
395                 pci_address,
396             },
397             port,
398         );
399         Ok(WorkerResponse::AddPortOk)
400     }
401 
402     /// Handles get port state: returns the PortState.
handle_get_port_state(&self, pci_address: PciAddress) -> Result<WorkerResponse>403     fn handle_get_port_state(&self, pci_address: PciAddress) -> Result<WorkerResponse> {
404         match self.get_port_state(pci_address) {
405             Ok(ps) => Ok(WorkerResponse::GetPortStateOk(ps)),
406             Err(e) => Ok(WorkerResponse::InvalidCommand(e)),
407         }
408     }
409 
410     /// Handle getting empty port: Find the most empty port, or return error if all are occupied.
handle_get_empty_port(&self) -> Result<WorkerResponse>411     fn handle_get_empty_port(&self) -> Result<WorkerResponse> {
412         let most_empty_port = match self.port_map.first_key_value() {
413             Some(p) => p.0,
414             None => return Ok(WorkerResponse::InvalidCommand(anyhow!("No ports added"))),
415         };
416         match most_empty_port.port_state {
417             PortState::Empty(_) | PortState::EmptyNotReady => {
418                 Ok(WorkerResponse::GetEmptyPortOk(most_empty_port.pci_address))
419             }
420             PortState::Occupied(_) | PortState::OccupiedNotReady => {
421                 Ok(WorkerResponse::InvalidCommand(anyhow!("No empty port")))
422             }
423         }
424     }
425 
426     /// Handles plug request: Moves PortState from EmptyNotReady to OccupiedNotReady, Empty(n) to
427     /// Occupied(n+1), and schedules the next plug event if n == 0.
handle_plug_request( &mut self, hotplug_command: SignalHotPlugCommand, ) -> Result<WorkerResponse>428     fn handle_plug_request(
429         &mut self,
430         hotplug_command: SignalHotPlugCommand,
431     ) -> Result<WorkerResponse> {
432         let pci_address = hotplug_command.upstream_address;
433         let next_state = match self.get_port_state(pci_address) {
434             Ok(PortState::Empty(n)) => {
435                 self.get_port_mut(pci_address)?
436                     .add_hotplug_devices(hotplug_command.guest_devices)?;
437                 if n == 0 {
438                     self.schedule_plug_event(pci_address)?;
439                 }
440                 PortState::Occupied(n + 1)
441             }
442             Ok(PortState::EmptyNotReady) => {
443                 self.get_port_mut(pci_address)?
444                     .add_hotplug_devices(hotplug_command.guest_devices)?;
445                 PortState::OccupiedNotReady
446             }
447             Ok(PortState::Occupied(_)) | Ok(PortState::OccupiedNotReady) => {
448                 return Ok(WorkerResponse::InvalidCommand(anyhow!(
449                     "Attempt to plug into an occupied port"
450                 )))
451             }
452             Err(e) => return Ok(WorkerResponse::InvalidCommand(e)),
453         };
454         self.set_port_state(pci_address, next_state)?;
455         Ok(WorkerResponse::SignalOk)
456     }
457 
458     /// Handles unplug request: Moves PortState from OccupiedNotReady to EmptyNotReady, Occupied(n)
459     /// to Empty(n % 2 + 1), and schedules the next unplug event if n == 0.
460     ///
461     /// n % 2 + 1: When unplug request is made, it either schedule the unplug event
462     /// (n == 0 => 1 or n == 1 => 2), or cancels the corresponding plug event that has not started
463     /// (n == 2 => 1 or n == 3 => 2). Staring at the mapping, it maps n to either 1 or 2 of opposite
464     /// oddity. n % 2 + 1 is a good shorthand instead of the individual mappings.
handle_unplug_request(&mut self, pci_address: PciAddress) -> Result<WorkerResponse>465     fn handle_unplug_request(&mut self, pci_address: PciAddress) -> Result<WorkerResponse> {
466         let next_state = match self.get_port_state(pci_address) {
467             Ok(PortState::Occupied(n)) => {
468                 if n >= 2 {
469                     self.get_port_mut(pci_address)?.cancel_queued_add()?;
470                 }
471                 if n == 0 {
472                     self.schedule_unplug_event(pci_address)?;
473                 }
474                 PortState::Empty(n % 2 + 1)
475             }
476             Ok(PortState::OccupiedNotReady) => PortState::EmptyNotReady,
477             Ok(PortState::Empty(_)) | Ok(PortState::EmptyNotReady) => {
478                 return Ok(WorkerResponse::InvalidCommand(anyhow!(
479                     "Attempt to unplug from an empty port"
480                 )))
481             }
482             Err(e) => return Ok(WorkerResponse::InvalidCommand(e)),
483         };
484         self.set_port_state(pci_address, next_state)?;
485         Ok(WorkerResponse::SignalOk)
486     }
487 
488     /// Handles port ready: Moves PortState from EmptyNotReady to Empty(0), OccupiedNotReady to
489     /// Occupied(1), and schedules the next event if port is occupied
handle_port_ready(&mut self, pci_address: PciAddress) -> Result<()>490     fn handle_port_ready(&mut self, pci_address: PciAddress) -> Result<()> {
491         let next_state = match self.get_port_state(pci_address)? {
492             PortState::EmptyNotReady => PortState::Empty(0),
493             PortState::OccupiedNotReady => {
494                 self.schedule_plug_event(pci_address)?;
495                 PortState::Occupied(1)
496             }
497             PortState::Empty(_) | PortState::Occupied(_) => {
498                 bail!("Received port ready on an already enabled port");
499             }
500         };
501         self.set_port_state(pci_address, next_state)
502     }
503 
504     /// Handles plug complete: Moves PortState from Any(n) to Any(n-1), and schedules the next
505     /// unplug event unless n == 1. (Any is either Empty or Occupied.)
handle_plug_complete(&mut self, pci_address: PciAddress) -> Result<()>506     fn handle_plug_complete(&mut self, pci_address: PciAddress) -> Result<()> {
507         let (n, next_state) = match self.get_port_state(pci_address)? {
508             // Note: n - 1 >= 0 as otherwise there would be no pending events.
509             PortState::Empty(n) => (n, PortState::Empty(n - 1)),
510             PortState::Occupied(n) => (n, PortState::Occupied(n - 1)),
511             PortState::EmptyNotReady | PortState::OccupiedNotReady => {
512                 bail!("Received plug completed on a not enabled port");
513             }
514         };
515         if n > 1 {
516             self.schedule_unplug_event(pci_address)?;
517         }
518         self.set_port_state(pci_address, next_state)
519     }
520 
521     /// Handles unplug complete: Moves PortState from Any(n) to Any(n-1), and schedules the next
522     /// plug event unless n == 1. (Any is either Empty or Occupied.)
handle_unplug_complete(&mut self, pci_address: PciAddress) -> Result<()>523     fn handle_unplug_complete(&mut self, pci_address: PciAddress) -> Result<()> {
524         let (n, next_state) = match self.get_port_state(pci_address)? {
525             // Note: n - 1 >= 0 as otherwise there would be no pending events.
526             PortState::Empty(n) => (n, PortState::Empty(n - 1)),
527             PortState::Occupied(n) => (n, PortState::Occupied(n - 1)),
528             PortState::EmptyNotReady | PortState::OccupiedNotReady => {
529                 bail!("Received unplug completed on a not enabled port");
530             }
531         };
532         if n > 1 {
533             self.schedule_plug_event(pci_address)?;
534         }
535         self.set_port_state(pci_address, next_state)
536     }
537 
get_port_state(&self, pci_address: PciAddress) -> Result<PortState>538     fn get_port_state(&self, pci_address: PciAddress) -> Result<PortState> {
539         Ok(*self
540             .port_state_map
541             .get(&pci_address)
542             .context(format!("Cannot find port state on {}", pci_address))?)
543     }
544 
set_port_state(&mut self, pci_address: PciAddress, port_state: PortState) -> Result<()>545     fn set_port_state(&mut self, pci_address: PciAddress, port_state: PortState) -> Result<()> {
546         let old_port_state = self.get_port_state(pci_address)?;
547         let port = self
548             .port_map
549             .remove(&PortKey {
550                 port_state: old_port_state,
551                 pci_address,
552             })
553             .context("Cannot find port")?;
554         self.port_map.insert(
555             PortKey {
556                 port_state,
557                 pci_address,
558             },
559             port,
560         );
561         self.port_state_map.insert(pci_address, port_state);
562         Ok(())
563     }
564 
schedule_plug_event(&mut self, pci_address: PciAddress) -> Result<()>565     fn schedule_plug_event(&mut self, pci_address: PciAddress) -> Result<()> {
566         let rootbus_controller = self.rootbus_controller.clone();
567         let plug_event = self
568             .get_port_mut(pci_address)?
569             .send_hot_plug_signal(&rootbus_controller)?;
570         self.wait_ctx.add(
571             &plug_event,
572             Token::PlugComplete(plug_event.as_raw_descriptor()),
573         )?;
574         self.event_map
575             .insert(plug_event.as_raw_descriptor(), (plug_event, pci_address));
576         Ok(())
577     }
578 
schedule_unplug_event(&mut self, pci_address: PciAddress) -> Result<()>579     fn schedule_unplug_event(&mut self, pci_address: PciAddress) -> Result<()> {
580         let rootbus_controller = self.rootbus_controller.clone();
581         let unplug_event = self
582             .get_port_mut(pci_address)?
583             .send_hot_unplug_signal(&rootbus_controller)?;
584         self.wait_ctx.add(
585             &unplug_event,
586             Token::UnplugComplete(unplug_event.as_raw_descriptor()),
587         )?;
588         self.event_map.insert(
589             unplug_event.as_raw_descriptor(),
590             (unplug_event, pci_address),
591         );
592         Ok(())
593     }
594 
get_port_mut(&mut self, pci_address: PciAddress) -> Result<&mut PortWorkerStub>595     fn get_port_mut(&mut self, pci_address: PciAddress) -> Result<&mut PortWorkerStub> {
596         let port_state = self.get_port_state(pci_address)?;
597         self.port_map
598             .get_mut(&PortKey {
599                 port_state,
600                 pci_address,
601             })
602             .context("PciHotPlugWorker is in invalid state")
603     }
604 }
605 
606 /// PortState indicates the state of the port.
607 ///
608 /// The initial PortState is EmptyNotReady (EmpNR). 9 PortStates are possible, and transition
609 /// between the states are only possible by the following 3 groups of functions:
610 /// handle_port_ready(R): guest notification of port ready to accept hot plug events.
611 /// handle_plug_request(P) and handle_unplug_request(U): host initated requests.
612 /// handle_plug_complete(PC) and handle_unplug_complete(UC): guest notification of event completion.
613 /// When a port is not ready, PC and UC are not expected as no events are scheduled.
614 /// The state transition is as follows:
615 ///    Emp0<-UC--Emp1<-PC--Emp2            |
616 ///  ^     \    ^    \^   ^    \^          |
617 /// /       P  /      P\ /      P\         |
618 /// |        \/        \\        \\        |
619 /// |        /\        /\\        \\       |
620 /// R       U  \      U  \U        \U      |
621 /// |      /    v    /    v\        v\     |
622 /// |  Occ0<-PC--Occ1<-UC--Occ2<-PC--Occ3  |
623 /// |              ^                       |
624 /// \              R                       |
625 ///   EmpNR<-P,U->OccNR                    |
626 
627 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
628 enum PortState {
629     /// Port is empty on crosvm. The state on the guest OS is n steps behind.
630     Empty(u8),
631     /// Port is empty on crosvm. The port is not enabled on the guest OS yet.
632     EmptyNotReady,
633     /// Port is occupied on crosvm. The state on the guest OS is n steps behind.
634     Occupied(u8),
635     /// Port is occupied on crosvm. The port is not enabled on the guest OS yet.
636     OccupiedNotReady,
637 }
638 
639 impl PortState {
variant_order_index(&self) -> u8640     fn variant_order_index(&self) -> u8 {
641         match self {
642             PortState::Empty(_) => 0,
643             PortState::EmptyNotReady => 1,
644             PortState::Occupied(_) => 2,
645             PortState::OccupiedNotReady => 3,
646         }
647     }
648 }
649 
650 /// Ordering on PortState defined by "most empty".
651 impl Ord for PortState {
cmp(&self, other: &Self) -> Ordering652     fn cmp(&self, other: &Self) -> Ordering {
653         // First compare by the variant: Empty < EmptyNotReady < Occupied < OccupiedNotReady.
654         match self.variant_order_index().cmp(&other.variant_order_index()) {
655             Ordering::Less => {
656                 return Ordering::Less;
657             }
658             Ordering::Equal => {}
659             Ordering::Greater => return Ordering::Greater,
660         }
661         // For the diagonals, prioritize ones with less step behind.
662         match (self, other) {
663             (PortState::Empty(lhs), PortState::Empty(rhs)) => lhs.cmp(rhs),
664             (PortState::Occupied(lhs), PortState::Occupied(rhs)) => lhs.cmp(rhs),
665             _ => Ordering::Equal,
666         }
667     }
668 }
669 
670 impl PartialOrd for PortState {
partial_cmp(&self, other: &Self) -> Option<Ordering>671     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
672         Some(self.cmp(other))
673     }
674 }
675 
676 /// PortKey is a unique identifier of ports with an ordering defined on it.
677 ///
678 /// Ports are ordered by whose downstream device would be discovered first by the guest OS.
679 /// Empty ports without pending events are ordered before those with pending events. When multiple
680 /// empty ports without pending events are available, they are ordered by PCI enumeration.
681 #[derive(PartialEq, Eq, PartialOrd, Ord)]
682 struct PortKey {
683     port_state: PortState,
684     pci_address: PciAddress,
685 }
686 
687 impl PciHotPlugManager {
688     /// Constructs PciHotPlugManager.
689     ///
690     /// Constructor uses forking, therefore has to be called early, before crosvm enters a
691     /// multi-threaded context.
new( guest_memory: GuestMemory, config: &Config, #[cfg(feature = "swap")] swap_device_helper: Option<SwapDeviceHelper>, ) -> Result<Self>692     pub fn new(
693         guest_memory: GuestMemory,
694         config: &Config,
695         #[cfg(feature = "swap")] swap_device_helper: Option<SwapDeviceHelper>,
696     ) -> Result<Self> {
697         let jail_warden: Box<dyn JailWarden> = match config.jail_config {
698             Some(_) => Box::new(
699                 JailWardenImpl::new(
700                     guest_memory,
701                     config,
702                     #[cfg(feature = "swap")]
703                     swap_device_helper,
704                 )
705                 .context("jail warden construction")?,
706             ),
707             None => Box::new(
708                 PermissiveJailWarden::new(
709                     guest_memory,
710                     config,
711                     #[cfg(feature = "swap")]
712                     swap_device_helper,
713                 )
714                 .context("jail warden construction")?,
715             ),
716         };
717         Ok(Self {
718             jail_warden,
719             port_stubs: BTreeMap::new(),
720             bus_address_map: BTreeMap::new(),
721             worker_client: None,
722         })
723     }
724 
725     /// Starts PciHotPlugManager. Required before any other commands.
726     ///
727     /// PciHotPlugManager::new must be called in a single-threaded context as it forks.
728     /// However, rootbus_controller is only available after VM boots when crosvm is multi-threaded.
729     ///
730     /// TODO(293801301): Remove unused after aarch64 support
731     #[allow(unused)]
set_rootbus_controller( &mut self, rootbus_controller: mpsc::Sender<PciRootCommand>, ) -> Result<()>732     pub fn set_rootbus_controller(
733         &mut self,
734         rootbus_controller: mpsc::Sender<PciRootCommand>,
735     ) -> Result<()> {
736         // Spins the PciHotPlugWorker.
737         self.worker_client = Some(WorkerClient::new(rootbus_controller)?);
738         Ok(())
739     }
740 
741     /// Adds a hotplug capable port to manage.
742     ///
743     /// PciHotPlugManager assumes exclusive control for adding and removing devices to this port.
744     /// TODO(293801301): Remove unused_variables after aarch64 support
745     #[allow(unused)]
add_port(&mut self, port: Arc<Mutex<dyn HotPlugBus>>) -> Result<()>746     pub fn add_port(&mut self, port: Arc<Mutex<dyn HotPlugBus>>) -> Result<()> {
747         let worker_client = self
748             .worker_client
749             .as_ref()
750             .context("No worker thread. Is set_rootbus_controller not called?")?;
751         let port_lock = port.lock();
752         // Rejects hotplug bus with downstream devices.
753         if !port_lock.is_empty() {
754             bail!("invalid hotplug bus");
755         }
756         let pci_address = port_lock
757             .get_address()
758             .context("Hotplug bus PCI address missing")?;
759         // Reject hotplug buses not on rootbus, since otherwise the order of enumeration depends on
760         // the topology of PCI.
761         if pci_address.bus != 0 {
762             bail!("hotplug port on non-root bus not supported");
763         }
764         let downstream_bus = port_lock
765             .get_secondary_bus_number()
766             .context("cannot get downstream bus")?;
767         drop(port_lock);
768         if let Some(prev_address) = self.bus_address_map.insert(downstream_bus, pci_address) {
769             bail!(
770                 "Downstream bus of new port is conflicting with previous port at {}",
771                 &prev_address
772             );
773         }
774         self.port_stubs.insert(
775             pci_address,
776             PortManagerStub {
777                 downstream_bus,
778                 devices: HashMap::new(),
779             },
780         );
781         match worker_client.send_worker_command(WorkerCommand::AddPort(
782             pci_address,
783             PortWorkerStub::new(port, downstream_bus)?,
784         ))? {
785             WorkerResponse::AddPortOk => Ok(()),
786             WorkerResponse::InvalidCommand(e) => Err(e),
787             r => bail!("Unexpected response from worker: {:?}", &r),
788         }
789     }
790 
791     /// hotplugs up to 8 PCI devices as "functions of a device" (in PCI Bus Device Function sense).
792     ///
793     /// returns the bus number of the bus on success.
hotplug_device<V: VmArch, Vcpu: VcpuArch>( &mut self, resource_carriers: Vec<ResourceCarrier>, linux: &mut RunnableLinuxVm<V, Vcpu>, resources: &mut SystemAllocator, ) -> Result<u8>794     pub fn hotplug_device<V: VmArch, Vcpu: VcpuArch>(
795         &mut self,
796         resource_carriers: Vec<ResourceCarrier>,
797         linux: &mut RunnableLinuxVm<V, Vcpu>,
798         resources: &mut SystemAllocator,
799     ) -> Result<u8> {
800         let worker_client = self
801             .worker_client
802             .as_ref()
803             .context("No worker thread. Is set_rootbus_controller not called?")?;
804         if resource_carriers.len() > 8 || resource_carriers.is_empty() {
805             bail!("PCI function count has to be 1 to 8 inclusive");
806         }
807         let pci_address = match worker_client.send_worker_command(WorkerCommand::GetEmptyPort)? {
808             WorkerResponse::GetEmptyPortOk(p) => Ok(p),
809             WorkerResponse::InvalidCommand(e) => Err(e),
810             r => bail!("Unexpected response from worker: {:?}", &r),
811         }?;
812         let port_stub = self
813             .port_stubs
814             .get_mut(&pci_address)
815             .context("Cannot find port")?;
816         let downstream_bus = port_stub.downstream_bus;
817         let mut devices = Vec::new();
818         for (func_num, mut resource_carrier) in resource_carriers.into_iter().enumerate() {
819             let device_address = PciAddress::new(0, downstream_bus as u32, 0, func_num as u32)?;
820             let hotplug_key = HotPlugKey::GuestDevice {
821                 guest_addr: device_address,
822             };
823             resource_carrier.allocate_address(device_address, resources)?;
824             let irq_evt = IrqLevelEvent::new()?;
825             let (pin, irq_num) = match downstream_bus % 4 {
826                 0 => (PciInterruptPin::IntA, 0),
827                 1 => (PciInterruptPin::IntB, 1),
828                 2 => (PciInterruptPin::IntC, 2),
829                 _ => (PciInterruptPin::IntD, 3),
830             };
831             resource_carrier.assign_irq(irq_evt.try_clone()?, pin, irq_num);
832             let (proxy_device, pid) = self
833                 .jail_warden
834                 .make_proxy_device(resource_carrier)
835                 .context("make proxy device")?;
836             let device_id = proxy_device.lock().device_id();
837             let device_name = proxy_device.lock().debug_label();
838             linux.irq_chip.as_irq_chip_mut().register_level_irq_event(
839                 irq_num,
840                 &irq_evt,
841                 IrqEventSource {
842                     device_id,
843                     queue_id: 0,
844                     device_name: device_name.clone(),
845                 },
846             )?;
847             let pid: u32 = pid.try_into().context("fork fail")?;
848             if pid > 0 {
849                 linux.pid_debug_label_map.insert(pid, device_name);
850             }
851             devices.push(GuestDeviceStub {
852                 pci_addr: device_address,
853                 key: hotplug_key,
854                 device: proxy_device,
855             });
856             port_stub
857                 .devices
858                 .insert(device_address, RecoverableResource { irq_num, irq_evt });
859         }
860         // Ask worker to schedule hotplug signal.
861         match worker_client.send_worker_command(WorkerCommand::SignalHotPlug(
862             SignalHotPlugCommand::new(pci_address, devices)?,
863         ))? {
864             WorkerResponse::SignalOk => Ok(downstream_bus),
865             WorkerResponse::InvalidCommand(e) => Err(e),
866             r => bail!("Unexpected response from worker: {:?}", &r),
867         }
868     }
869 
870     /// Removes all hotplugged devices on the hotplug bus.
remove_hotplug_device<V: VmArch, Vcpu: VcpuArch>( &mut self, bus: u8, linux: &mut RunnableLinuxVm<V, Vcpu>, resources: &mut SystemAllocator, ) -> Result<()>871     pub fn remove_hotplug_device<V: VmArch, Vcpu: VcpuArch>(
872         &mut self,
873         bus: u8,
874         linux: &mut RunnableLinuxVm<V, Vcpu>,
875         resources: &mut SystemAllocator,
876     ) -> Result<()> {
877         let worker_client = self
878             .worker_client
879             .as_ref()
880             .context("No worker thread. Is set_rootbus_controller not called?")?;
881         let pci_address = self
882             .bus_address_map
883             .get(&bus)
884             .context(format!("Port {} is not known", &bus))?;
885         match worker_client.send_worker_command(WorkerCommand::GetPortState(*pci_address))? {
886             WorkerResponse::GetPortStateOk(PortState::Occupied(_)) => {}
887             WorkerResponse::GetPortStateOk(PortState::Empty(_)) => {
888                 bail!("Port {} is empty", &bus)
889             }
890             WorkerResponse::InvalidCommand(e) => {
891                 return Err(e);
892             }
893             wr => bail!("Unexpected response from worker: {:?}", &wr),
894         };
895         // Performs a surprise removal. That is, not waiting for hot removal completion before
896         // deleting the resources.
897         match worker_client.send_worker_command(WorkerCommand::SignalHotUnplug(*pci_address))? {
898             WorkerResponse::SignalOk => {}
899             WorkerResponse::InvalidCommand(e) => {
900                 return Err(e);
901             }
902             wr => bail!("Unexpected response from worker: {:?}", &wr),
903         }
904         // Remove all devices on the hotplug bus.
905         let port_stub = self
906             .port_stubs
907             .get_mut(pci_address)
908             .context(format!("Port {} is not known", &bus))?;
909         for (downstream_address, recoverable_resource) in port_stub.devices.drain() {
910             // port_stub.port does not have remove_hotplug_device method, as devices are removed
911             // when hot_unplug is called.
912             resources.release_pci(downstream_address);
913             linux.irq_chip.unregister_level_irq_event(
914                 recoverable_resource.irq_num,
915                 &recoverable_resource.irq_evt,
916             )?;
917         }
918         Ok(())
919     }
920 }
921 
922 #[cfg(test)]
923 mod tests {
924     use std::thread;
925     use std::time::Duration;
926 
927     use devices::DeviceId;
928     use devices::Suspendable;
929     use serde::Deserialize;
930     use serde::Serialize;
931     use snapshot::AnySnapshot;
932 
933     use super::*;
934 
935     /// A MockPort that only supports hot_plug and hot_unplug commands, and signaling command
936     /// complete manually, which is sufficient for PciHotPlugWorker unit test.
937     struct MockPort {
938         cc_event: Event,
939         downstream_bus: u8,
940         ready_events: Vec<Event>,
941     }
942 
943     impl MockPort {
new(downstream_bus: u8) -> Self944         fn new(downstream_bus: u8) -> Self {
945             Self {
946                 cc_event: Event::new().unwrap(),
947                 downstream_bus,
948                 ready_events: Vec::new(),
949             }
950         }
951 
signal_cc(&self)952         fn signal_cc(&self) {
953             self.cc_event.reset().unwrap();
954             self.cc_event.signal().unwrap();
955         }
956 
signal_ready(&mut self)957         fn signal_ready(&mut self) {
958             for event in self.ready_events.drain(..) {
959                 event.reset().unwrap();
960                 event.signal().unwrap();
961             }
962         }
963     }
964 
965     #[derive(Copy, Clone, Serialize, Deserialize, Eq, PartialEq, Debug)]
966     struct MockDevice;
967 
968     impl Suspendable for MockDevice {
snapshot(&mut self) -> anyhow::Result<AnySnapshot>969         fn snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
970             AnySnapshot::to_any(self).context("error serializing")
971         }
972 
restore(&mut self, data: AnySnapshot) -> anyhow::Result<()>973         fn restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
974             *self = AnySnapshot::from_any(data).context("error deserializing")?;
975             Ok(())
976         }
977 
sleep(&mut self) -> anyhow::Result<()>978         fn sleep(&mut self) -> anyhow::Result<()> {
979             Ok(())
980         }
981 
wake(&mut self) -> anyhow::Result<()>982         fn wake(&mut self) -> anyhow::Result<()> {
983             Ok(())
984         }
985     }
986 
987     impl BusDevice for MockDevice {
device_id(&self) -> DeviceId988         fn device_id(&self) -> DeviceId {
989             DeviceId::try_from(0xdead_beef).unwrap()
990         }
debug_label(&self) -> String991         fn debug_label(&self) -> String {
992             "mock device".to_owned()
993         }
994     }
995 
996     impl HotPlugBus for MockPort {
hot_plug(&mut self, _addr: PciAddress) -> anyhow::Result<Option<Event>>997         fn hot_plug(&mut self, _addr: PciAddress) -> anyhow::Result<Option<Event>> {
998             self.cc_event = Event::new().unwrap();
999             Ok(Some(self.cc_event.try_clone().unwrap()))
1000         }
1001 
hot_unplug(&mut self, _addr: PciAddress) -> anyhow::Result<Option<Event>>1002         fn hot_unplug(&mut self, _addr: PciAddress) -> anyhow::Result<Option<Event>> {
1003             self.cc_event = Event::new().unwrap();
1004             Ok(Some(self.cc_event.try_clone().unwrap()))
1005         }
1006 
get_ready_notification(&mut self) -> anyhow::Result<Event>1007         fn get_ready_notification(&mut self) -> anyhow::Result<Event> {
1008             let event = Event::new()?;
1009             self.ready_events.push(event.try_clone()?);
1010             Ok(event)
1011         }
1012 
is_match(&self, _host_addr: PciAddress) -> Option<u8>1013         fn is_match(&self, _host_addr: PciAddress) -> Option<u8> {
1014             None
1015         }
1016 
get_address(&self) -> Option<PciAddress>1017         fn get_address(&self) -> Option<PciAddress> {
1018             None
1019         }
1020 
get_secondary_bus_number(&self) -> Option<u8>1021         fn get_secondary_bus_number(&self) -> Option<u8> {
1022             Some(self.downstream_bus)
1023         }
1024 
add_hotplug_device(&mut self, _hotplug_key: HotPlugKey, _guest_addr: PciAddress)1025         fn add_hotplug_device(&mut self, _hotplug_key: HotPlugKey, _guest_addr: PciAddress) {}
1026 
get_hotplug_device(&self, _hotplug_key: HotPlugKey) -> Option<PciAddress>1027         fn get_hotplug_device(&self, _hotplug_key: HotPlugKey) -> Option<PciAddress> {
1028             None
1029         }
1030 
is_empty(&self) -> bool1031         fn is_empty(&self) -> bool {
1032             true
1033         }
1034 
get_hotplug_key(&self) -> Option<HotPlugKey>1035         fn get_hotplug_key(&self) -> Option<HotPlugKey> {
1036             None
1037         }
1038     }
1039 
new_port(downstream_bus: u8) -> Arc<Mutex<MockPort>>1040     fn new_port(downstream_bus: u8) -> Arc<Mutex<MockPort>> {
1041         Arc::new(Mutex::new(MockPort::new(downstream_bus)))
1042     }
1043 
poll_until_with_timeout<F>(f: F, timeout: Duration) -> bool where F: Fn() -> bool,1044     fn poll_until_with_timeout<F>(f: F, timeout: Duration) -> bool
1045     where
1046         F: Fn() -> bool,
1047     {
1048         for _ in 0..timeout.as_millis() {
1049             if f() {
1050                 return true;
1051             }
1052             thread::sleep(Duration::from_millis(1));
1053         }
1054         false
1055     }
1056 
1057     #[test]
worker_empty_port_ordering()1058     fn worker_empty_port_ordering() {
1059         let (rootbus_controller, _rootbus_recvr) = mpsc::channel();
1060         let client = WorkerClient::new(rootbus_controller).unwrap();
1061         // Port A: upstream 00:01.1, downstream 2.
1062         let upstream_addr_a = PciAddress {
1063             bus: 0,
1064             dev: 1,
1065             func: 1,
1066         };
1067         let bus_a = 2;
1068         let downstream_addr_a = PciAddress {
1069             bus: bus_a,
1070             dev: 0,
1071             func: 0,
1072         };
1073         let hotplug_key_a = HotPlugKey::GuestDevice {
1074             guest_addr: downstream_addr_a,
1075         };
1076         let device_a = GuestDeviceStub {
1077             pci_addr: downstream_addr_a,
1078             key: hotplug_key_a,
1079             device: Arc::new(Mutex::new(MockDevice)),
1080         };
1081         let hotplug_command_a =
1082             SignalHotPlugCommand::new(upstream_addr_a, [device_a].to_vec()).unwrap();
1083         let port_a = new_port(bus_a);
1084         // Port B: upstream 00:01.0, downstream 3.
1085         let upstream_addr_b = PciAddress {
1086             bus: 0,
1087             dev: 1,
1088             func: 0,
1089         };
1090         let bus_b = 3;
1091         let downstream_addr_b = PciAddress {
1092             bus: bus_b,
1093             dev: 0,
1094             func: 0,
1095         };
1096         let hotplug_key_b = HotPlugKey::GuestDevice {
1097             guest_addr: downstream_addr_b,
1098         };
1099         let device_b = GuestDeviceStub {
1100             pci_addr: downstream_addr_b,
1101             key: hotplug_key_b,
1102             device: Arc::new(Mutex::new(MockDevice)),
1103         };
1104         let hotplug_command_b =
1105             SignalHotPlugCommand::new(upstream_addr_b, [device_b].to_vec()).unwrap();
1106         let port_b = new_port(bus_b);
1107         // Port C: upstream 00:02.0, downstream 4.
1108         let upstream_addr_c = PciAddress {
1109             bus: 0,
1110             dev: 2,
1111             func: 0,
1112         };
1113         let bus_c = 4;
1114         let downstream_addr_c = PciAddress {
1115             bus: bus_c,
1116             dev: 0,
1117             func: 0,
1118         };
1119         let hotplug_key_c = HotPlugKey::GuestDevice {
1120             guest_addr: downstream_addr_c,
1121         };
1122         let device_c = GuestDeviceStub {
1123             pci_addr: downstream_addr_c,
1124             key: hotplug_key_c,
1125             device: Arc::new(Mutex::new(MockDevice)),
1126         };
1127         let hotplug_command_c =
1128             SignalHotPlugCommand::new(upstream_addr_c, [device_c].to_vec()).unwrap();
1129         let port_c = new_port(bus_c);
1130         assert_eq!(
1131             WorkerResponse::AddPortOk,
1132             client
1133                 .send_worker_command(WorkerCommand::AddPort(
1134                     upstream_addr_a,
1135                     PortWorkerStub::new(port_a.clone(), bus_a).unwrap()
1136                 ))
1137                 .unwrap()
1138         );
1139         assert_eq!(
1140             WorkerResponse::AddPortOk,
1141             client
1142                 .send_worker_command(WorkerCommand::AddPort(
1143                     upstream_addr_b,
1144                     PortWorkerStub::new(port_b.clone(), bus_b).unwrap()
1145                 ))
1146                 .unwrap()
1147         );
1148         assert_eq!(
1149             WorkerResponse::AddPortOk,
1150             client
1151                 .send_worker_command(WorkerCommand::AddPort(
1152                     upstream_addr_c,
1153                     PortWorkerStub::new(port_c.clone(), bus_c).unwrap()
1154                 ))
1155                 .unwrap()
1156         );
1157         port_a.lock().signal_ready();
1158         assert!(poll_until_with_timeout(
1159             || client
1160                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr_a))
1161                 .unwrap()
1162                 == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1163             Duration::from_millis(500)
1164         ));
1165         port_b.lock().signal_ready();
1166         assert!(poll_until_with_timeout(
1167             || client
1168                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr_b))
1169                 .unwrap()
1170                 == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1171             Duration::from_millis(500)
1172         ));
1173         port_c.lock().signal_ready();
1174         assert!(poll_until_with_timeout(
1175             || client
1176                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr_c))
1177                 .unwrap()
1178                 == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1179             Duration::from_millis(500)
1180         ));
1181         // All ports empty and in sync. Should get port B.
1182         assert_eq!(
1183             WorkerResponse::GetEmptyPortOk(upstream_addr_b),
1184             client
1185                 .send_worker_command(WorkerCommand::GetEmptyPort)
1186                 .unwrap()
1187         );
1188         assert_eq!(
1189             WorkerResponse::SignalOk,
1190             client
1191                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command_b))
1192                 .unwrap()
1193         );
1194         // Should get port A.
1195         assert_eq!(
1196             WorkerResponse::GetEmptyPortOk(upstream_addr_a),
1197             client
1198                 .send_worker_command(WorkerCommand::GetEmptyPort)
1199                 .unwrap()
1200         );
1201         assert_eq!(
1202             WorkerResponse::SignalOk,
1203             client
1204                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command_a))
1205                 .unwrap()
1206         );
1207         // Should get port C.
1208         assert_eq!(
1209             WorkerResponse::GetEmptyPortOk(upstream_addr_c),
1210             client
1211                 .send_worker_command(WorkerCommand::GetEmptyPort)
1212                 .unwrap()
1213         );
1214         assert_eq!(
1215             WorkerResponse::SignalOk,
1216             client
1217                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command_c))
1218                 .unwrap()
1219         );
1220         // Should get an error since no port is empty.
1221         if let WorkerResponse::InvalidCommand(_) = client
1222             .send_worker_command(WorkerCommand::GetEmptyPort)
1223             .unwrap()
1224         {
1225             // Assert result is of Error type.
1226         } else {
1227             unreachable!();
1228         }
1229         // Remove device from port A, immediately it should be available.
1230         assert_eq!(
1231             WorkerResponse::SignalOk,
1232             client
1233                 .send_worker_command(WorkerCommand::SignalHotUnplug(upstream_addr_a))
1234                 .unwrap()
1235         );
1236         assert_eq!(
1237             WorkerResponse::GetEmptyPortOk(upstream_addr_a),
1238             client
1239                 .send_worker_command(WorkerCommand::GetEmptyPort)
1240                 .unwrap()
1241         );
1242         // Moreover, it should be 2 steps behind.
1243         assert_eq!(
1244             WorkerResponse::GetPortStateOk(PortState::Empty(2)),
1245             client
1246                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr_a))
1247                 .unwrap()
1248         );
1249     }
1250 
1251     #[test]
worker_port_state_transitions()1252     fn worker_port_state_transitions() {
1253         let (rootbus_controller, _rootbus_recvr) = mpsc::channel();
1254         let client = WorkerClient::new(rootbus_controller).unwrap();
1255         let upstream_addr = PciAddress {
1256             bus: 0,
1257             dev: 1,
1258             func: 1,
1259         };
1260         let bus = 2;
1261         let downstream_addr = PciAddress {
1262             bus,
1263             dev: 0,
1264             func: 0,
1265         };
1266         let hotplug_key = HotPlugKey::GuestDevice {
1267             guest_addr: downstream_addr,
1268         };
1269         let device = GuestDeviceStub {
1270             pci_addr: downstream_addr,
1271             key: hotplug_key,
1272             device: Arc::new(Mutex::new(MockDevice)),
1273         };
1274         let hotplug_command = SignalHotPlugCommand::new(upstream_addr, [device].to_vec()).unwrap();
1275         let port = new_port(bus);
1276         assert_eq!(
1277             WorkerResponse::AddPortOk,
1278             client
1279                 .send_worker_command(WorkerCommand::AddPort(
1280                     upstream_addr,
1281                     PortWorkerStub::new(port.clone(), bus).unwrap()
1282                 ))
1283                 .unwrap()
1284         );
1285         port.lock().signal_ready();
1286         assert!(poll_until_with_timeout(
1287             || client
1288                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1289                 .unwrap()
1290                 == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1291             Duration::from_millis(500)
1292         ));
1293         assert_eq!(
1294             WorkerResponse::SignalOk,
1295             client
1296                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command.clone()))
1297                 .unwrap()
1298         );
1299         assert!(poll_until_with_timeout(
1300             || client
1301                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1302                 .unwrap()
1303                 == WorkerResponse::GetPortStateOk(PortState::Occupied(1)),
1304             Duration::from_millis(500)
1305         ));
1306         assert_eq!(
1307             WorkerResponse::SignalOk,
1308             client
1309                 .send_worker_command(WorkerCommand::SignalHotUnplug(upstream_addr))
1310                 .unwrap()
1311         );
1312         assert!(poll_until_with_timeout(
1313             || client
1314                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1315                 .unwrap()
1316                 == WorkerResponse::GetPortStateOk(PortState::Empty(2)),
1317             Duration::from_millis(500)
1318         ));
1319         assert_eq!(
1320             WorkerResponse::SignalOk,
1321             client
1322                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command.clone()))
1323                 .unwrap()
1324         );
1325         assert!(poll_until_with_timeout(
1326             || client
1327                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1328                 .unwrap()
1329                 == WorkerResponse::GetPortStateOk(PortState::Occupied(3)),
1330             Duration::from_millis(500)
1331         ));
1332         port.lock().signal_cc();
1333         assert!(poll_until_with_timeout(
1334             || client
1335                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1336                 .unwrap()
1337                 == WorkerResponse::GetPortStateOk(PortState::Occupied(2)),
1338             Duration::from_millis(500)
1339         ));
1340         assert_eq!(
1341             WorkerResponse::SignalOk,
1342             client
1343                 .send_worker_command(WorkerCommand::SignalHotUnplug(upstream_addr))
1344                 .unwrap()
1345         );
1346         // Moves from Occupied(2) to Empty(1) since it is redundant to unplug a device that is yet
1347         // to be plugged in.
1348         assert!(poll_until_with_timeout(
1349             || client
1350                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1351                 .unwrap()
1352                 == WorkerResponse::GetPortStateOk(PortState::Empty(1)),
1353             Duration::from_millis(500)
1354         ));
1355         port.lock().signal_cc();
1356         assert!(poll_until_with_timeout(
1357             || client
1358                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1359                 .unwrap()
1360                 == WorkerResponse::GetPortStateOk(PortState::Empty(0)),
1361             Duration::from_millis(500)
1362         ));
1363     }
1364 
1365     #[test]
worker_port_early_plug_state_transitions()1366     fn worker_port_early_plug_state_transitions() {
1367         let (rootbus_controller, _rootbus_recvr) = mpsc::channel();
1368         let client = WorkerClient::new(rootbus_controller).unwrap();
1369         let upstream_addr = PciAddress {
1370             bus: 0,
1371             dev: 1,
1372             func: 1,
1373         };
1374         let bus = 2;
1375         let downstream_addr = PciAddress {
1376             bus,
1377             dev: 0,
1378             func: 0,
1379         };
1380         let hotplug_key = HotPlugKey::GuestDevice {
1381             guest_addr: downstream_addr,
1382         };
1383         let device = GuestDeviceStub {
1384             pci_addr: downstream_addr,
1385             key: hotplug_key,
1386             device: Arc::new(Mutex::new(MockDevice)),
1387         };
1388         let hotplug_command = SignalHotPlugCommand::new(upstream_addr, [device].to_vec()).unwrap();
1389         let port = new_port(bus);
1390         assert_eq!(
1391             WorkerResponse::AddPortOk,
1392             client
1393                 .send_worker_command(WorkerCommand::AddPort(
1394                     upstream_addr,
1395                     PortWorkerStub::new(port.clone(), bus).unwrap()
1396                 ))
1397                 .unwrap()
1398         );
1399         assert!(poll_until_with_timeout(
1400             || client
1401                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1402                 .unwrap()
1403                 == WorkerResponse::GetPortStateOk(PortState::EmptyNotReady),
1404             Duration::from_millis(500)
1405         ));
1406         assert_eq!(
1407             WorkerResponse::SignalOk,
1408             client
1409                 .send_worker_command(WorkerCommand::SignalHotPlug(hotplug_command.clone()))
1410                 .unwrap()
1411         );
1412         assert!(poll_until_with_timeout(
1413             || client
1414                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1415                 .unwrap()
1416                 == WorkerResponse::GetPortStateOk(PortState::OccupiedNotReady),
1417             Duration::from_millis(500)
1418         ));
1419         port.lock().signal_ready();
1420         assert!(poll_until_with_timeout(
1421             || client
1422                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1423                 .unwrap()
1424                 == WorkerResponse::GetPortStateOk(PortState::Occupied(1)),
1425             Duration::from_millis(500)
1426         ));
1427         port.lock().signal_cc();
1428         assert!(poll_until_with_timeout(
1429             || client
1430                 .send_worker_command(WorkerCommand::GetPortState(upstream_addr))
1431                 .unwrap()
1432                 == WorkerResponse::GetPortStateOk(PortState::Occupied(0)),
1433             Duration::from_millis(500)
1434         ));
1435     }
1436 }
1437