1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Check USB device by running linux command on CfM""" 6 7from __future__ import print_function 8 9import logging 10import re 11import time 12from autotest_lib.client.common_lib.cros.manual import get_usb_devices 13from autotest_lib.client.common_lib.cros import power_cycle_usb_util 14 15CORE_DIR_LINES = 3 16ATRUS = '18d1:8001' 17 18def check_chrome_logfile(dut): 19 """ 20 Get the latest chrome log file. 21 @param dut: The handle of the device under test. 22 @returns: the latest chrome log file 23 """ 24 output = None 25 logging.info('---Get the latest chrome log file') 26 cmd = 'ls -latr /var/log/chrome/chrome' 27 try: 28 output = dut.run(cmd, ignore_status=True).stdout 29 except Exception as e: 30 logging.exception('Fail to run command %s.', cmd) 31 return None 32 logging.info('---cmd: %s', cmd) 33 logging.info('---output: %s', output.lower().strip()) 34 return output 35 36def check_last_reboot(dut): 37 """ 38 Get the last line of eventlog.txt 39 @param dut: The handle of the device under test. 40 @returns: the last line in eventlog.txt 41 """ 42 output = None 43 cmd = 'tail -1 /var/log/eventlog.txt' 44 logging.info('---Get the latest reboot log') 45 try: 46 output = dut.run(cmd, ignore_status=True).stdout 47 except Exception as e: 48 logging.exception('Fail to run command %s.', cmd) 49 return None 50 logging.info('---cmd: %s', cmd) 51 logging.info('---output: %s', output.lower().strip()) 52 return output 53 54def check_is_platform(dut, name): 55 """ 56 Check whether CfM is expected platform. 57 @param dut: The handle of the device under test. 58 @param name: The name of platform 59 @returns: True, if CfM's platform is same as expected. 60 False, if not. 61 """ 62 cmd = "mosys platform name" 63 output = dut.run(cmd, ignore_status=True).stdout.strip() 64 logging.info('---cmd: %s', cmd) 65 logging.info('---output: %s', output.lower()) 66 return output.lower() == name 67 68 69def get_mgmt_ipv4(dut): 70 """ 71 Get mgmt ipv4 address 72 @param dut: The handle of the device under test. Should be initialized in 73 autotest. 74 @return: ipv4 address for mgmt interface. 75 """ 76 cmd = 'ifconfig -a | grep eth0 -A 2 | grep netmask' 77 try: 78 output = dut.run(cmd, ignore_status=True).stdout 79 except Exception as e: 80 logging.exception('Fail to run command %s.', cmd) 81 return None 82 ipv4 = re.findall(r"inet\s*([0-9.]+)\s*netmask.*", output)[0] 83 return ipv4 84 85 86def retrieve_usb_devices(dut): 87 """ 88 Populate output of usb-devices on CfM. 89 @param dut: handle of CfM under test 90 @returns dict of all usb devices detected on CfM. 91 """ 92 usb_devices = (dut.run('usb-devices', ignore_status=True). 93 stdout.strip().split('\n\n')) 94 usb_data = get_usb_devices.extract_usb_data( 95 '\nUSB-Device\n'+'\nUSB-Device\n'.join(usb_devices)) 96 return usb_data 97 98 99def extract_peripherals_for_cfm(usb_data): 100 """ 101 Check CfM has camera, speaker and Mimo connected. 102 @param usb_data: dict extracted from output of "usb-devices" 103 """ 104 peripheral_map = {} 105 106 speaker_list = get_usb_devices.get_speakers(usb_data) 107 camera_list = get_usb_devices.get_cameras(usb_data) 108 display_list = get_usb_devices.get_display_mimo(usb_data) 109 controller_list = get_usb_devices.get_controller_mimo(usb_data) 110 for pid_vid, device_count in speaker_list.iteritems(): 111 if device_count > 0: 112 peripheral_map[pid_vid] = device_count 113 114 for pid_vid, device_count in camera_list.iteritems(): 115 if device_count > 0: 116 peripheral_map[pid_vid] = device_count 117 118 for pid_vid, device_count in controller_list.iteritems(): 119 if device_count > 0: 120 peripheral_map[pid_vid] = device_count 121 122 for pid_vid, device_count in display_list.iteritems(): 123 if device_count > 0: 124 peripheral_map[pid_vid] = device_count 125 126 for pid_vid, device_count in peripheral_map.iteritems(): 127 logging.info('---device: %s (%s), count: %d', 128 pid_vid, get_usb_devices.get_device_prod(pid_vid), 129 device_count) 130 131 return peripheral_map 132 133 134def check_peripherals_for_cfm(peripheral_map): 135 """ 136 Check CfM has one and only one camera, 137 one and only one speaker, 138 or one and only one mimo. 139 @param peripheral_map: dict for connected camera, speaker, or mimo. 140 @returns: True if check passes, 141 False if check fails. 142 """ 143 peripherals = peripheral_map.keys() 144 145 type_camera = set(peripherals).intersection(get_usb_devices.CAMERA_LIST) 146 type_speaker = set(peripherals).intersection(get_usb_devices.SPEAKER_LIST) 147 type_controller = set(peripherals).intersection(\ 148 get_usb_devices.TOUCH_CONTROLLER_LIST) 149 type_panel = set(peripherals).intersection(\ 150 get_usb_devices.TOUCH_DISPLAY_LIST) 151 152 # check CfM have one, and only one type camera, huddly and mimo 153 if len(type_camera) == 0: 154 logging.info('No camera is found on CfM.') 155 return False 156 157 if not len(type_camera) == 1: 158 logging.info('More than one type of cameras are found on CfM.') 159 return False 160 161 if len(type_speaker) == 0: 162 logging.info('No speaker is found on CfM.') 163 return False 164 165 if not len(type_speaker) == 1: 166 logging.info('More than one type of speakers are found on CfM.') 167 return False 168 169 if len(type_controller) == 0: 170 logging.info('No controller is found on CfM.') 171 return False 172 173 174 if not len(type_controller) == 1: 175 logging.info('More than one type of controller are found on CfM.') 176 return False 177 178 if len(type_panel) == 0: 179 logging.info('No Display is found on CfM.') 180 return False 181 182 if not len(type_panel) == 1: 183 logging.info('More than one type of displays are found on CfM.') 184 return False 185 186 # check CfM have only one camera, huddly and mimo 187 for pid_vid, device_count in peripheral_map.iteritems(): 188 if device_count > 1: 189 logging.info('Number of device %s connected to CfM : %d', 190 get_usb_devices.get_device_prod(pid_vid), 191 device_count) 192 return False 193 194 return True 195 196 197def check_usb_enumeration(dut, puts): 198 """ 199 Check USB enumeration for devices 200 @param dut: the handle of CfM under test 201 @param puts: the list of peripherals under test 202 @returns True, none if test passes 203 False, errMsg if test test fails 204 """ 205 usb_data = retrieve_usb_devices(dut) 206 if not usb_data: 207 logging.warning('No usb devices found on DUT') 208 return False, 'No usb device found on DUT.' 209 else: 210 usb_device_list = extract_peripherals_for_cfm(usb_data) 211 logging.info('---usb device = %s', usb_device_list) 212 if not set(puts).issubset(set(usb_device_list.keys())): 213 logging.info('Detect device fails for usb enumeration') 214 logging.info('Expect enumerated devices: %s', puts) 215 logging.info('Actual enumerated devices: %s', 216 usb_device_list.keys()) 217 return False, 'Some usb devices are not found.' 218 return True, None 219 220 221def check_usb_interface_initializion(dut, puts): 222 """ 223 Check CfM shows valid interface for all peripherals connected. 224 @param dut: the handle of CfM under test 225 @param puts: the list of peripherals under test 226 @returns True, none if test passes 227 False, errMsg if test test fails 228 """ 229 usb_data = retrieve_usb_devices(dut) 230 for put in puts: 231 number, health = get_usb_devices.is_usb_device_ok(usb_data, put) 232 logging.info('---device interface = %d, %s for %s', 233 number, health, get_usb_devices.get_device_prod(put)) 234 if '0' in health: 235 logging.warning('Device %s has invalid interface', put) 236 return False, 'Device {} has invalid interface.'.format(put) 237 return True, None 238 239 240def clear_core_file(dut): 241 """clear core files""" 242 cmd = "rm -rf /var/spool/crash/*.*" 243 try: 244 dut.run_output(cmd) 245 except Exception as e: 246 logging.exception('Fail to clean core files under ' 247 '/var/spool/crash') 248 logging.exception('Fail to execute %s :', cmd) 249 250 251def check_process_crash(dut, cdlines): 252 """Check whether there is core file.""" 253 cmd = 'ls -latr /var/spool/crash' 254 try: 255 core_files_output = dut.run_output(cmd).splitlines() 256 except Exception as e: 257 logging.exception('Can not find file under /var/spool/crash.') 258 logging.exception('Fail to execute %s:', cmd) 259 return True, CORE_DIR_LINES 260 logging.info('---%s\n---%s', cmd, core_files_output) 261 if len(core_files_output) - cdlines <= 0: 262 logging.info('---length of files: %d', len(core_files_output)) 263 return True, len(core_files_output) 264 else: 265 return False, len(core_files_output) 266 267 268def gpio_usb_test(dut, gpio_list, device_list, pause, board): 269 """ 270 Run GPIO test to powercycle usb port. 271 @parama dut: handler of CfM, 272 @param gpio_list: the list of gpio ports, 273 @param device_list: the list of usb devices, 274 @param pause: time needs to wait before restoring power to usb port, 275 in seconds 276 @param board: board name for CfM 277 @returns True 278 """ 279 for device in device_list: 280 vid, pid = device.split(':') 281 logging.info('---going to powercyle device %s:%s', vid, pid) 282 try: 283 power_cycle_usb_util.power_cycle_usb_vidpid(dut, board, 284 vid, pid, pause) 285 except Exception as e: 286 errmsg = 'Fail to power cycle device.' 287 logging.exception('%s.', errmsg) 288 return False, errmsg 289 290 return True, None 291 292 293def reboot_test(dut, pause): 294 """ 295 Reboot CfM. 296 @parama dut: handler of CfM, 297 @param pause: time needs to wait after issuing reboot command, in seconds, 298 299 """ 300 try: 301 dut.reboot() 302 except Exception as e: 303 logging.exception('Fail to reboot CfM.') 304 return False 305 logging.info('---reboot done') 306 time.sleep(pause) 307 return True 308 309 310 311def find_last_log(dut, speaker): 312 """ 313 Get the lastlast_lines line for log files. 314 @param dut: handler of CfM 315 @param speaker: vidpid if speaker. 316 @returns: the list of string of the last line of logs. 317 """ 318 last_lines = { 319 'messages':[], 320 'chrome':[], 321 'ui': [], 322 'atrus': [] 323 } 324 logging.debug('Get the last line of log file, speaker %s', speaker) 325 try: 326 cmd = "tail -1 /var/log/messages | awk -v N=1 '{print $N}'" 327 last_lines['messages'] = dut.run_output(cmd).strip().split()[0] 328 cmd = "tail -1 /var/log/chrome/chrome | awk -v N=1 '{print $N}'" 329 last_lines['chrome'] = dut.run_output(cmd).strip().split()[0] 330 cmd = "tail -1 /var/log/ui/ui.LATEST | awk -v N=1 '{print $N}'" 331 last_lines['ui']= dut.run_output(cmd) 332 if speaker == ATRUS and check_is_platform(dut, 'guado'): 333 logging.info('---atrus speaker %s connected to CfM', speaker) 334 cmd = 'tail -1 /var/log/atrus.log | awk -v N=1 "{print $N}"' 335 last_lines['atrus'] = dut.run_output(cmd).strip().split()[0] 336 except Exception as e: 337 logging.exception('Fail to get the last line from log files.') 338 for item, timestamp in last_lines.iteritems(): 339 logging.debug('---%s: %s', item, timestamp) 340 return last_lines 341 342 343def collect_log_since_last_check(dut, lastlines, logfile): 344 """Collect log file since last check.""" 345 output = None 346 if logfile == "messages": 347 cmd ='awk \'/{}/,0\' /var/log/messages'.format(lastlines[logfile]) 348 if logfile == "chrome": 349 cmd ='awk \'/{}/,0\' /var/log/chrome/chrome'.format(lastlines[logfile]) 350 if logfile == "ui": 351 cmd ='awk \'/{}/,0\' /var/log/ui/ui.LATEST'.format(lastlines[logfile]) 352 if logfile == 'atrus': 353 cmd ='awk \'/{}/,0\' /var/log/atrus.log'.format(lastlines[logfile]) 354 logging.info('---cmd = %s', cmd) 355 try: 356 output = dut.run_output(cmd).split('\n') 357 except Exception as e: 358 logging.exception('Fail to get output from log files.') 359 logging.info('---length of log: %d', len(output)) 360 if not output: 361 logging.info('--fail to find match log, check the latest log.') 362 363 if not output: 364 if logfile == "messages": 365 cmd ='cat /var/log/messages' 366 if logfile == "chrome": 367 cmd ='cat /var/log/chrome/chrome' 368 if logfile == "ui": 369 cmd ='cat /var/log/ui/ui.LATEST' 370 if logfile == 'atrus': 371 cmd ='cat /var/log/atrus.log' 372 output = dut.run_output(cmd).split('\n') 373 logging.info('---length of log: %d', len(output)) 374 return output 375 376def check_log(dut, timestamp, error_list, checkitem, logfile): 377 """ 378 Check logfile does not contain any element in error_list[checkitem]. 379 """ 380 error_log_list = [] 381 logging.info('---now check log %s in file %s', checkitem, logfile) 382 output = collect_log_since_last_check(dut, timestamp, logfile) 383 for _error in error_list[checkitem]: 384 error_log_list.extend([s for s in output if _error in str(s)]) 385 if not error_log_list: 386 return True, None 387 else: 388 tempmsg = '\n'.join(error_log_list) 389 errmsg = 'Error_Found:in_log_file:{}:{}.'.format(logfile, tempmsg) 390 logging.info('---%s', errmsg) 391 return False, errmsg 392