# Copyright (c) 2014 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import argparse import copy import csv import logging import os import re import shutil CONNECT_FAIL = object() CONFIG_FAIL = object() RESULTS_DIR = '/tmp/chaos' class ChaosParser(object): """Defines a parser for chaos test results""" def __init__(self, results_dir, create_file, print_config_failures): """ Constructs a parser interface. @param results_dir: complete path to restuls directory for a chaos test. @param create_file: True to create csv files; False otherwise. @param print_config_failures: True to print the config info to stdout; False otherwise. """ self._test_results_dir = results_dir self._create_file = create_file self._print_config_failures = print_config_failures def convert_set_to_string(self, set_list): """Converts a set to a single string. @param set_list: a set to convert @returns a string, which is all items separated by the word 'and' """ return_string = str() for i in set_list: return_string += str('%s and ' % i) return return_string[:-5] def create_csv(self, filename, data_list): """Creates a file in .csv format. @param filename: name for the csv file @param data_list: a list of all the info to write to a file """ if not os.path.exists(RESULTS_DIR): os.mkdir(RESULTS_DIR) try: path = os.path.join(RESULTS_DIR, filename + '.csv') with open(path, 'wb') as f: writer = csv.writer(f) writer.writerow(data_list) logging.info('Created CSV file %s', path) except IOError as e: logging.error('File operation failed with %s: %s', e.errno, e.strerror) return def get_ap_name(self, line): """Gets the router name from the string passed. @param line: Test ERROR string from chaos status.log @returns the router name or brand. """ router_info = re.search('Router name: ([\w\s]+)', line) return router_info.group(1) def get_ap_mode_chan_freq(self, ssid): """Gets the AP band from ssid using channel. @param ssid: A valid chaos test SSID as a string @returns the AP band, mode, and channel. """ channel_security_info = ssid.split('_') channel_info = channel_security_info[-2] mode = channel_security_info[-3] channel = int(re.split('(\d+)', channel_info)[1]) # TODO Choose if we want to keep band, we never put it in the # spreadsheet and is currently unused. if channel in range(1, 15): band = '2.4GHz' else: band = '5GHz' return {'mode': mode.upper(), 'channel': channel, 'band': band} def generate_percentage_string(self, passed_tests, total_tests): """Creates a pass percentage string in the formation x/y (zz%) @param passed_tests: int of passed tests @param total_tests: int of total tests @returns a formatted string as described above. """ percent = float(passed_tests)/float(total_tests) * 100 percent_string = str(int(round(percent))) + '%' return str('%d/%d (%s)' % (passed_tests, total_tests, percent_string)) def parse_keyval(self, filepath): """Parses the 'keyvalue' file to get device details. @param filepath: the complete path to the keyval file @returns a board with device name and OS version. """ # Android information does not exist in the keyfile, add temporary # information into the dictionary. crbug.com/570408 lsb_dict = {'board': 'unknown', 'version': 'unknown'} f = open(filepath, 'r') for line in f: line = line.split('=') if 'RELEASE_BOARD' in line[0]: lsb_dict = {'board':line[1].rstrip()} elif 'RELEASE_VERSION' in line[0]: lsb_dict['version'] = line[1].rstrip() else: continue f.close() return lsb_dict def parse_status_log(self, board, os_version, security, status_log_path): """Parses the entire status.log file from chaos test for test failures. and creates two CSV files for connect fail and configuration fail respectively. @param board: the board the test was run against as a string @param os_version: the version of ChromeOS as a string @param security: the security used during the test as a string @param status_log_path: complete path to the status.log file """ # Items that can have multiple values modes = list() channels = list() test_fail_aps = list() static_config_failures = list() dynamic_config_failures = list() kernel_version = "" fw_version = "" f = open(status_log_path, 'r') total = 0 for line in f: line = line.strip() if line.startswith('START\tnetwork_WiFi'): # Do not count PDU failures in total tests run. if 'PDU' in line: continue total += 1 elif 'kernel_version' in line: kernel_version = re.search('[\d.]+', line).group(0) elif 'firmware_version' in line: fw_version = re.search('firmware_version\': \'([\w\s:().]+)', line).group(1) elif line.startswith('ERROR') or line.startswith('FAIL'): title_info = line.split() if 'reboot' in title_info: continue # Get the hostname for the AP that failed configuration. if 'PDU' in title_info[1]: continue else: # Get the router name, band for the AP that failed # connect. if 'Config' in title_info[1]: failure_type = CONFIG_FAIL else: failure_type = CONNECT_FAIL if (failure_type == CONFIG_FAIL and 'chromeos' in title_info[1]): ssid = title_info[1].split('.')[1].split('_')[0] else: ssid_info = title_info[1].split('.') ssid = ssid_info[1] network_dict = self.get_ap_mode_chan_freq(ssid) modes.append(network_dict['mode']) channels.append(network_dict['channel']) # Security mismatches and Ping failures are not connect # failures. if (('Ping command' in line or 'correct security' in line) or failure_type == CONFIG_FAIL): if 'StaticAPConfigurator' in line: static_config_failures.append(ssid) else: dynamic_config_failures.append(ssid) else: test_fail_aps.append(ssid) elif ('END GOOD' in line and ('ChaosConnectDisconnect' in line or 'ChaosLongConnect' in line)): test_name = line.split()[2] ssid = test_name.split('.')[1] network_dict = self.get_ap_mode_chan_freq(ssid) modes.append(network_dict['mode']) channels.append(network_dict['channel']) else: continue config_pass = total - (len(dynamic_config_failures) + len(static_config_failures)) config_pass_string = self.generate_percentage_string(config_pass, total) connect_pass = config_pass - len(test_fail_aps) connect_pass_string = self.generate_percentage_string(connect_pass, config_pass) base_csv_list = [board, os_version, fw_version, kernel_version, self.convert_set_to_string(set(modes)), self.convert_set_to_string(set(channels)), security] static_config_csv_list = copy.deepcopy(base_csv_list) static_config_csv_list.append(config_pass_string) static_config_csv_list.extend(static_config_failures) dynamic_config_csv_list = copy.deepcopy(base_csv_list) dynamic_config_csv_list.append(config_pass_string) dynamic_config_csv_list.extend(dynamic_config_failures) connect_csv_list = copy.deepcopy(base_csv_list) connect_csv_list.append(connect_pass_string) connect_csv_list.extend(test_fail_aps) print('Connect failure for security: %s' % security) print ','.join(connect_csv_list) print('\n') if self._print_config_failures: config_files = [('Static', static_config_csv_list), ('Dynamic', dynamic_config_csv_list)] for config_data in config_files: self.print_config_failures(config_data[0], security, config_data[1]) if self._create_file: self.create_csv('chaos_WiFi_dynamic_config_fail.' + security, dynamic_config_csv_list) self.create_csv('chaos_WiFi_static_config_fail.' + security, static_config_csv_list) self.create_csv('chaos_WiFi_connect_fail.' + security, connect_csv_list) def print_config_failures(self, config_type, security, config_csv_list): """Prints out the configuration failures. @param config_type: string describing the configurator type @param security: the security type as a string @param config_csv_list: list of the configuration failures """ # 8 because that is the lenth of the base list if len(config_csv_list) <= 8: return print('%s config failures for security: %s' % (config_type, security)) print ','.join(config_csv_list) print('\n') def traverse_results_dir(self, path): """Walks through the results directory and get the pathnames for the status.log and the keyval files. @param path: complete path to a specific test result directory. @returns a dict with absolute pathnames for the 'status.log' and 'keyfile' files. """ status = None keyval = None for root, dir_name, file_name in os.walk(path): for name in file_name: current_path = os.path.join(root, name) if name == 'status.log' and not status: status = current_path elif name == 'keyval' and ('param-debug_info' in open(current_path).read()): # This is a keyval file for a single test and not a suite. keyval = os.path.join(root, name) break else: continue if not keyval: raise Exception('Did Chaos tests complete successfully? Rerun tests' ' with missing results.') return {'status_file': status, 'keyval_file': keyval} def parse_results_dir(self): """Parses each result directory. For each results directory created by test_that, parse it and create summary files. """ if os.path.exists(RESULTS_DIR): shutil.rmtree(RESULTS_DIR) test_processed = False for results_dir in os.listdir(self._test_results_dir): if 'results' in results_dir: path = os.path.join(self._test_results_dir, results_dir) test = results_dir.split('.')[1] status_key_dict = self.traverse_results_dir(path) status_log_path = status_key_dict['status_file'] lsb_info = self.parse_keyval(status_key_dict['keyval_file']) if test is not None: self.parse_status_log(lsb_info['board'], lsb_info['version'], test, status_log_path) test_processed = True if not test_processed: raise RuntimeError('chaos_parse: Did not find any results directory' 'to process') def main(): """Main function to call the parser.""" logging.basicConfig(level=logging.INFO) arg_parser = argparse.ArgumentParser() arg_parser.add_argument('-d', '--directory', dest='dir_name', help='Pathname to results generated by test_that', required=True) arg_parser.add_argument('--create_file', dest='create_file', action='store_true', default=False) arg_parser.add_argument('--print_config_failures', dest='print_config_failures', action='store_true', default=False) arguments = arg_parser.parse_args() if not arguments.dir_name: raise RuntimeError('chaos_parser: No directory name supplied. Use -h' ' for help') if not os.path.exists(arguments.dir_name): raise RuntimeError('chaos_parser: Invalid directory name supplied.') parser = ChaosParser(arguments.dir_name, arguments.create_file, arguments.print_config_failures) parser.parse_results_dir() if __name__ == '__main__': main()