1#!/usr/bin/python2 2 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""This is a module to scan /sys/block/ virtual FS, query udev 8 9It provides a list of all removable or USB devices connected to the machine on 10which the module is running. 11It can be used from command line or from a python script. 12 13To use it as python module it's enough to call the get_all() function. 14@see |get_all| documentation for the output format 15|get_all()| output is human readable (as oppposite to python's data structures) 16""" 17 18import logging, os, re 19 20# this script can be run at command line on DUT (ie /usr/local/autotest 21# contains only the client/ subtree), on a normal autotest 22# installation/repository or as a python module used on a client-side test. 23import common 24 25from autotest_lib.client.common_lib import error 26from autotest_lib.client.common_lib import utils 27 28INFO_PATH = "/sys/block" 29UDEV_CMD_FOR_SERIAL_NUMBER = "udevadm info -a -n %s | grep -iE 'ATTRS{" \ 30 "serial}' | head -n 1" 31LSUSB_CMD = "lsusb -v | grep -iE '^Device Desc|bcdUSB|iSerial'" 32DESC_PATTERN = r'Device Descriptor:' 33BCDUSB_PATTERN = r'bcdUSB\s+(\d+\.\d+)' 34ISERIAL_PATTERN = r'iSerial\s+\d\s(\S*)' 35UDEV_SERIAL_PATTERN = r'=="(.*)"' 36 37 38def read_file(path_to_file, host=None): 39 """Reads the file and returns the file content 40 @param path_to_file: Full path to the file 41 @param host: DUT object 42 @return: Returns the content of file 43 """ 44 if host: 45 if not host.path_exists(path_to_file): 46 raise error.TestError("No such file or directory %s" % path_to_file) 47 return host.run('cat %s' % path_to_file).stdout.strip() 48 49 if not os.path.isfile(path_to_file): 50 raise error.TestError("No such file or directory %s" % path_to_file) 51 return utils.read_file(path_to_file).strip() 52 53 54def system_output(command, host=None, ignore_status=False): 55 """Executes command on client 56 57 @param host: DUT object 58 @param command: command to execute 59 @return: output of command 60 """ 61 if host: 62 return host.run(command, ignore_status=ignore_status).stdout.strip() 63 64 return utils.system_output(command, ignore_status=ignore_status) 65 66 67def get_udev_info(blockdev, method='udev', host=None): 68 """Get information about |blockdev| 69 70 @param blockdev: a block device, e.g., /dev/sda1 or /dev/sda 71 @param method: either 'udev' (default) or 'blkid' 72 @param host: DUT object 73 74 @return a dictionary with two or more of the followig keys: 75 "ID_BUS", "ID_MODEL": always present 76 "ID_FS_UUID", "ID_FS_TYPE", "ID_FS_LABEL": present only if those info 77 are meaningul and present for the queried device 78 """ 79 ret = {} 80 cmd = None 81 ignore_status = False 82 83 if method == "udev": 84 cmd = "udevadm info --name %s --query=property" % blockdev 85 elif method == "blkid": 86 # this script is run as root in a normal autotest run, 87 # so this works: It doesn't have access to the necessary info 88 # when run as a non-privileged user 89 cmd = "blkid -c /dev/null -o udev %s" % blockdev 90 ignore_status = True 91 92 if cmd: 93 output = system_output(cmd, host, ignore_status=ignore_status) 94 95 udev_keys = ("ID_BUS", "ID_MODEL", "ID_FS_UUID", "ID_FS_TYPE", 96 "ID_FS_LABEL") 97 for line in output.splitlines(): 98 udev_key, udev_val = line.split('=') 99 100 if udev_key in udev_keys: 101 ret[udev_key] = udev_val 102 103 return ret 104 105 106def get_lsusb_info(host=None): 107 """Get lsusb info in list format 108 109 @param host: DUT object 110 @return: Returns lsusb output in list format 111 """ 112 113 usb_info_list = [] 114 # Getting the USB type and Serial number info using 'lsusb -v'. Sample 115 # output is shown in below 116 # Device Descriptor: 117 # bcdUSB 2.00 118 # iSerial 3 131BC7 119 # bcdUSB 2.00 120 # Device Descriptor: 121 # bcdUSB 2.10 122 # iSerial 3 001A4D5E8634B03169273995 123 124 lsusb_output = system_output(LSUSB_CMD, host) 125 # we are parsing each line and getting the usb info 126 for line in lsusb_output.splitlines(): 127 desc_matched = re.search(DESC_PATTERN, line) 128 bcdusb_matched = re.search(BCDUSB_PATTERN, line) 129 iserial_matched = re.search(ISERIAL_PATTERN, line) 130 if desc_matched: 131 usb_info = {} 132 elif bcdusb_matched: 133 # bcdUSB may appear multiple time. Drop the remaining. 134 usb_info['bcdUSB'] = bcdusb_matched.group(1) 135 elif iserial_matched: 136 usb_info['iSerial'] = iserial_matched.group(1) 137 usb_info_list.append(usb_info) 138 logging.debug('lsusb output is %s', usb_info_list) 139 return usb_info_list 140 141 142def get_usbdevice_type_and_serial(device, lsusb_info, host=None): 143 """Get USB device type and Serial number 144 145 @param device: USB device mount point Example: /dev/sda or /dev/sdb 146 @param lsusb_info: lsusb info 147 @param host: DUT object 148 @return: Returns the information about USB type and the serial number 149 of the device 150 """ 151 152 # Comparing the lsusb serial number with udev output serial number 153 # Both serial numbers should be same. Sample udev command output is 154 # shown in below. 155 # ATTRS{serial}=="001A4D5E8634B03169273995" 156 udev_serial_output = system_output(UDEV_CMD_FOR_SERIAL_NUMBER % device, 157 host) 158 udev_serial_matched = re.search(UDEV_SERIAL_PATTERN, udev_serial_output) 159 if udev_serial_matched: 160 udev_serial = udev_serial_matched.group(1) 161 logging.debug("udev serial number is %s", udev_serial) 162 for usb_details in lsusb_info: 163 if usb_details['iSerial'] == udev_serial: 164 return usb_details.get('bcdUSB'), udev_serial 165 return None, None 166 167def get_partition_info(part_path, bus, model, partid=None, fstype=None, 168 label=None, block_size=0, is_removable=False, 169 lsusb_info=[], host=None): 170 """Return information about a device as a list of dictionaries 171 172 Normally a single device described by the passed parameters will match a 173 single device on the system, and thus a single element list as return 174 value; although it's possible that a single block device is associated with 175 several mountpoints, this scenario will lead to a dictionary for each 176 mountpoint. 177 178 @param part_path: full partition path under |INFO_PATH| 179 e.g., /sys/block/sda or /sys/block/sda/sda1 180 @param bus: bus, e.g., 'usb' or 'ata', according to udev 181 @param model: device moduel, e.g., according to udev 182 @param partid: partition id, if present 183 @param fstype: filesystem type, if present 184 @param label: filesystem label, if present 185 @param block_size: filesystem block size 186 @param is_removable: whether it is a removable device 187 @param host: DUT object 188 @param lsusb_info: lsusb info 189 190 @return a list of dictionaries contaning each a partition info. 191 An empty list can be returned if no matching device is found 192 """ 193 ret = [] 194 # take the partitioned device name from the /sys/block/ path name 195 part = part_path.split('/')[-1] 196 device = "/dev/%s" % part 197 198 if not partid: 199 info = get_udev_info(device, "blkid", host=host) 200 partid = info.get('ID_FS_UUID', None) 201 if not fstype: 202 fstype = info.get('ID_FS_TYPE', None) 203 if not label: 204 label = partid 205 206 readonly = read_file("%s/ro" % part_path, host) 207 if not int(readonly): 208 partition_blocks = read_file("%s/size" % part_path, host) 209 size = block_size * int(partition_blocks) 210 211 stub = {} 212 stub['device'] = device 213 stub['bus'] = bus 214 stub['model'] = model 215 stub['size'] = size 216 217 # look for it among the mounted devices first 218 mounts = read_file("/proc/mounts", host).splitlines() 219 seen = False 220 for line in mounts: 221 dev, mount, proc_fstype, flags = line.split(' ', 3) 222 223 if device == dev: 224 if 'rw' in flags.split(','): 225 seen = True # at least one match occurred 226 227 # Sorround mountpoint with quotes, to make it parsable in 228 # case of spaces. Also information retrieved from 229 # /proc/mount override the udev passed ones (e.g., 230 # proc_fstype instead of fstype) 231 dev = stub.copy() 232 dev['fs_uuid'] = partid 233 dev['fstype'] = proc_fstype 234 dev['is_mounted'] = True 235 # When USB device is mounted automatically after login a 236 # non-labelled drive is mounted to: 237 # '/media/removable/USB Drive' 238 # Here an octal unicode '\040' is added to the path 239 # replacing ' ' (space). 240 # Following '.decode('unicode-escape')' handles the same 241 dev['mountpoint'] = mount.decode('unicode-escape') 242 dev['is_removable'] = is_removable 243 dev['usb_type'], dev['serial'] = \ 244 get_usbdevice_type_and_serial(dev['device'], 245 lsusb_info=lsusb_info, 246 host=host) 247 ret.append(dev) 248 249 # If not among mounted devices, it's just attached, print about the 250 # same information but suggest a place where the user can mount the 251 # device instead 252 if not seen: 253 # we consider it if it's removable and and a partition id 254 # OR it's on the USB bus or ATA bus. 255 # Some USB HD do not get announced as removable, but they should be 256 # showed. 257 # There are good changes that if it's on a USB bus it's removable 258 # and thus interesting for us, independently whether it's declared 259 # removable 260 if (is_removable and partid) or bus in ['usb', 'ata']: 261 if not label: 262 info = get_udev_info(device, 'blkid', host=host) 263 label = info.get('ID_FS_LABEL', partid) 264 265 dev = stub.copy() 266 dev['fs_uuid'] = partid 267 dev['fstype'] = fstype 268 dev['is_mounted'] = False 269 dev['mountpoint'] = "/media/removable/%s" % label 270 dev['is_removable'] = is_removable 271 dev['usb_type'], dev['serial'] = \ 272 get_usbdevice_type_and_serial(dev['device'], 273 lsusb_info=lsusb_info, 274 host=host) 275 ret.append(dev) 276 return ret 277 278 279def get_device_info(blockdev, lsusb_info, host=None): 280 """Retrieve information about |blockdev| 281 282 @see |get_partition_info()| doc for the dictionary format 283 284 @param blockdev: a block device name, e.g., "sda". 285 @param host: DUT object 286 @param lsusb_info: lsusb info 287 @return a list of dictionary, with each item representing a found device 288 """ 289 ret = [] 290 291 spath = "%s/%s" % (INFO_PATH, blockdev) 292 block_size = int(read_file("%s/queue/physical_block_size" % spath, 293 host)) 294 is_removable = bool(int(read_file("%s/removable" % spath, host))) 295 296 info = get_udev_info(blockdev, "udev", host=host) 297 dev_bus = info['ID_BUS'] 298 dev_model = info['ID_MODEL'] 299 dev_fs = info.get('ID_FS_TYPE', None) 300 dev_uuid = info.get('ID_FS_UUID', None) 301 dev_label = info.get('ID_FS_LABEL', dev_uuid) 302 303 has_partitions = False 304 for basename in system_output('ls %s' % spath, host).splitlines(): 305 partition_path = "%s/%s" % (spath, basename) 306 # we want to check if within |spath| there are subdevices with 307 # partitions 308 # e.g., if within /sys/block/sda sda1 and other partition are present 309 if not re.match("%s[0-9]+" % blockdev, basename): 310 continue # ignore what is not a subdevice 311 312 # |blockdev| has subdevices: get info for them 313 has_partitions = True 314 devs = get_partition_info(partition_path, dev_bus, dev_model, 315 block_size=block_size, 316 is_removable=is_removable, 317 lsusb_info=lsusb_info, host=host) 318 ret.extend(devs) 319 320 if not has_partitions: 321 devs = get_partition_info(spath, dev_bus, dev_model, dev_uuid, dev_fs, 322 dev_label, block_size=block_size, 323 is_removable=is_removable, 324 lsusb_info=lsusb_info, host=host) 325 ret.extend(devs) 326 327 return ret 328 329 330def get_all(host=None): 331 """Return all removable or USB storage devices attached 332 333 @param host: DUT object 334 @return a list of dictionaries, each list element describing a device 335 """ 336 ret = [] 337 lsusb_info = get_lsusb_info(host) 338 for dev in system_output('ls %s' % INFO_PATH, host).splitlines(): 339 # Among block devices we need to filter out what are virtual 340 if re.match("s[a-z]+", dev): 341 # for each of them try to obtain some info 342 ret.extend(get_device_info(dev, lsusb_info, host=host)) 343 return ret 344 345 346def main(): 347 for device in get_all(): 348 print ("%(device)s %(bus)s %(model)s %(size)d %(fs_uuid)s %(fstype)s " 349 "%(is_mounted)d %(mountpoint)s %(usb_type)s %(serial)s" % 350 device) 351 352 353if __name__ == "__main__": 354 main() 355