1import cStringIO 2 3from autotest_lib.client.bin import utils 4from autotest_lib.client.common_lib.cros import textfsm 5from autotest_lib.client.common_lib.cros.cfm.usb import usb_device 6 7 8class UsbDeviceCollector(object): 9 """Utility class for obtaining info about connected USB devices.""" 10 11 USB_DEVICES_TEMPLATE = ( 12 'Value Required Vendor ([0-9a-fA-F]+)\n' 13 'Value Required ProdID ([0-9A-Fa-f]+)\n' 14 'Value Required prev ([0-9a-fA-Z.]+)\n' 15 'Value Required Bus ([0-9.]+)\n' 16 'Value Required Port ([0-9.]+)\n' 17 'Value Required Lev ([0-9.]+)\n' 18 'Value Required Dev ([0-9.]+)\n' 19 'Value Required Prnt ([0-9.]+)\n' 20 'Value Manufacturer (.+)\n' 21 'Value Product (.+)\n' 22 'Value serialnumber ([0-9a-fA-Z\:\-]+)\n' 23 'Value cinterfaces (\d)\n' 24 'Value List intindex ([0-9])\n' 25 'Value List intdriver ([A-Za-z-\(\)]+)\n\n' 26 'Start\n' 27 ' ^USB-Device -> Continue.Record\n' 28 ' ^T:\s+Bus=${Bus}\s+Lev=${Lev}\s+Prnt=${Prnt}' 29 '\s+Port=${Port}.*Dev#=\s*${Dev}.*\n' 30 ' ^P:\s+Vendor=${Vendor}\s+ProdID=${ProdID}\sRev=${prev}\n' 31 ' ^S:\s+Manufacturer=${Manufacturer}\n' 32 ' ^S:\s+Product=${Product}\n' 33 ' ^S:\s+SerialNumber=${serialnumber}\n' 34 ' ^C:\s+\#Ifs=\s+${cinterfaces}\n' 35 ' ^I:\s+If\#=\s+${intindex}.*Driver=${intdriver}\n' 36 ) 37 38 def __init__(self, host=None): 39 """ 40 Constructor 41 @param host: An optional host object if running against a remote host. 42 """ 43 self._host = host 44 45 def _extract_usb_data(self, rawdata): 46 """ 47 Populate usb data into a list of dictionaries. 48 @param rawdata The output of "usb-devices" on CfM. 49 @returns list of dictionary, example dictionary: 50 {'Manufacturer': 'USBest Technology', 51 'Product': 'SiS HID Touch Controller', 52 'Vendor': '266e', 53 'intindex': ['0'], 54 'tport': '00', 55 'tcnt': '01', 56 'serialnumber': '', 57 'tlev': '03', 58 'tdev': '18', 59 'dver': '', 60 'intdriver': ['usbhid'], 61 'tbus': '01', 62 'prev': '03.00', 63 'cinterfaces': '1', 64 'ProdID': '0110', 65 'tprnt': '14'} 66 """ 67 usbdata = [] 68 rawdata += '\n' 69 re_table = textfsm.TextFSM(cStringIO.StringIO(self.USB_DEVICES_TEMPLATE)) 70 fsm_results = re_table.ParseText(rawdata) 71 usbdata = [dict(zip(re_table.header, row)) for row in fsm_results] 72 return usbdata 73 74 def _collect_usb_device_data(self): 75 """Collecting usb device data.""" 76 run = utils.run if self._host is None else self._host.run 77 usb_devices = (run('usb-devices', ignore_status=True). 78 stdout.strip().split('\n\n')) 79 return self._extract_usb_data( 80 '\nUSB-Device\n'+'\nUSB-Device\n'.join(usb_devices)) 81 82 83 def _create_usb_device(self, usbdata): 84 return usb_device.UsbDevice( 85 vid=usbdata['Vendor'], 86 pid=usbdata['ProdID'], 87 product=usbdata.get('Product', 'Not available'), 88 interfaces=usbdata['intdriver'], 89 bus=int(usbdata['Bus']), 90 level=int(usbdata['Lev']), 91 # We increment here by 1 because usb-devices reports 0-indexed port 92 # numbers where as lsusb reports 1-indexed. We opted to follow the 93 # the lsusb standard. 94 port=int(usbdata['Port']) + 1) 95 96 def get_usb_devices(self): 97 """ 98 Returns the list of UsbDevices connected to the DUT. 99 @returns A list of UsbDevice instances. 100 """ 101 usbdata = self._collect_usb_device_data() 102 data_and_devices = [] 103 for data in usbdata: 104 usb_device = self._create_usb_device(data) 105 data_and_devices.append((data, usb_device)) 106 # Make a pass to populate parents of the UsbDevices. 107 # We need parent ID and Device ID from the raw data since we do not 108 # care about storing those in a UsbDevice. That's why we bother 109 # iterating through the (data,UsbDevice) pairs. 110 for data, usb_device in data_and_devices: 111 parent_id = int(data['Prnt']) 112 bus = usb_device.bus 113 # Device IDs are not unique across busses. When finding a device's 114 # parent we look for a device with the parent ID on the same bus. 115 usb_device.parent = self._find_device_on_same_bus( 116 data_and_devices, parent_id, bus) 117 return [x[1] for x in data_and_devices] 118 119 def _find_device_on_same_bus(self, data_and_devices, device_id, bus): 120 for data, usb_device in data_and_devices: 121 if int(data['Dev']) == device_id and usb_device.bus == bus: 122 return usb_device 123 return None 124 125 def get_devices_by_spec(self, *specs): 126 """ 127 Returns all UsbDevices that match the any of the given specs. 128 @param specs instances of UsbDeviceSpec. 129 @returns a list UsbDevice instances. 130 """ 131 spec_vid_pids = [spec.vid_pid for spec in specs] 132 return [d for d in self.get_usb_devices() 133 if d.vid_pid in spec_vid_pids] 134