#!/usr/bin/env python3 # # Copyright 2017 The Chromium Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import argparse import collections import contextlib import json import logging import tempfile import os import sys try: from urllib.parse import urlencode from urllib.request import urlopen except ImportError: from urllib import urlencode from urllib2 import urlopen CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.abspath(os.path.join( CURRENT_DIR, '..', '..', '..', '..', '..')) sys.path.append(os.path.join(BASE_DIR, 'build', 'android')) from pylib.results.presentation import standard_gtest_merge from pylib.utils import google_storage_helper # pylint: disable=import-error sys.path.append(os.path.join(BASE_DIR, 'third_party')) import jinja2 # pylint: disable=import-error JINJA_ENVIRONMENT = jinja2.Environment( loader=jinja2.FileSystemLoader(os.path.dirname(__file__)), autoescape=True) def cell(data, html_class='center'): """Formats table cell data for processing in jinja template.""" return { 'data': data, 'class': html_class, } def pre_cell(data, html_class='center'): """Formats table
cell data for processing in jinja template.""" return { 'cell_type': 'pre', 'data': data, 'class': html_class, } class LinkTarget: # Opens the linked document in a new window or tab. NEW_TAB = '_blank' # Opens the linked document in the same frame as it was clicked. CURRENT_TAB = '_self' def link(data, href, target=LinkTarget.CURRENT_TAB): """Formats tag data for processing in jinja template. Args: data: String link appears as on HTML page. href: URL where link goes. target: Where link should be opened (e.g. current tab or new tab). """ return { 'data': data, 'href': href, 'target': target, } def links_cell(links, html_class='center', rowspan=None): """Formats table cell with links for processing in jinja template. Args: links: List of link dictionaries. Use |link| function to generate them. html_class: Class for table cell. rowspan: Rowspan HTML attribute. """ return { 'cell_type': 'links', 'class': html_class, 'links': links, 'rowspan': rowspan, } def action_cell(action, data, html_class): """Formats table cell with javascript actions. Args: action: Javscript action. data: Data in cell. class: Class for table cell. """ return { 'cell_type': 'action', 'action': action, 'data': data, 'class': html_class, } def flakiness_dashbord_link(test_name, suite_name, bucket): # Assume the bucket will be like "foo-bar-baz", we will take "foo" # as the test_project. # Fallback to "chromium" if bucket is not passed, e.g. local_output=True test_project = bucket.split('-')[0] if bucket else 'chromium' query = '%s/%s' % (suite_name, test_name) url_args = urlencode([('t', 'TESTS'), ('q', query), ('tp', test_project)]) return 'https://ci.chromium.org/ui/search?%s' % url_args def logs_cell(result, test_name, suite_name, bucket): """Formats result logs data for processing in jinja template.""" link_list = [] result_link_dict = result.get('links', {}) result_link_dict['flakiness'] = flakiness_dashbord_link( test_name, suite_name, bucket) for name, href in sorted(result_link_dict.items()): link_list.append(link( data=name, href=href, target=LinkTarget.NEW_TAB)) if link_list: return links_cell(link_list) return cell('(no logs)') def code_search(test, cs_base_url): """Returns URL for test on codesearch.""" search = test.replace('#', '.') return '%s/search/?q=%s&type=cs' % (cs_base_url, search) def status_class(status): """Returns HTML class for test status.""" if not status: return 'failure unknown' status = status.lower() if status not in ('success', 'skipped'): return 'failure %s' % status return status def create_test_table(results_dict, cs_base_url, suite_name, bucket): """Format test data for injecting into HTML table.""" header_row = [ cell(data='test_name', html_class='text'), cell(data='status', html_class='flaky'), cell(data='elapsed_time_ms', html_class='number'), cell(data='logs', html_class='text'), cell(data='output_snippet', html_class='text'), ] test_row_blocks = [] for test_name, test_results in results_dict.items(): test_runs = [] for index, result in enumerate(test_results): if index == 0: test_run = [links_cell( links=[ link(href=code_search(test_name, cs_base_url), target=LinkTarget.NEW_TAB, data=test_name)], rowspan=len(test_results), html_class='left %s' % test_name )] # test_name else: test_run = [] test_run.extend([ cell(data=result['status'] or 'UNKNOWN', # status html_class=('center %s' % status_class(result['status']))), cell(data=result['elapsed_time_ms']), # elapsed_time_ms logs_cell(result, test_name, suite_name, bucket), # logs pre_cell(data=result['output_snippet'], # output_snippet html_class='left'), ]) test_runs.append(test_run) test_row_blocks.append(test_runs) return header_row, test_row_blocks def create_suite_table(results_dict): """Format test suite data for injecting into HTML table.""" SUCCESS_COUNT_INDEX = 1 FAIL_COUNT_INDEX = 2 ALL_COUNT_INDEX = 3 TIME_INDEX = 4 header_row = [ cell(data='suite_name', html_class='text'), cell(data='number_success_tests', html_class='number'), cell(data='number_fail_tests', html_class='number'), cell(data='all_tests', html_class='number'), cell(data='elapsed_time_ms', html_class='number'), ] footer_row = [ action_cell( 'showTestsOfOneSuiteOnlyWithNewState("TOTAL")', 'TOTAL', 'center' ), # TOTAL cell(data=0), # number_success_tests cell(data=0), # number_fail_tests cell(data=0), # all_tests cell(data=0), # elapsed_time_ms ] suite_row_dict = collections.defaultdict(lambda: [ # Note: |suite_name| will be given in the following for loop. # It is not assigned yet here. action_cell('showTestsOfOneSuiteOnlyWithNewState("%s")' % suite_name, suite_name, 'left'), # suite_name cell(data=0), # number_success_tests cell(data=0), # number_fail_tests cell(data=0), # all_tests cell(data=0), # elapsed_time_ms ]) for test_name, test_results in results_dict.items(): # TODO(mikecase): This logic doesn't work if there are multiple test runs. # That is, if 'per_iteration_data' has multiple entries. # Since we only care about the result of the last test run. result = test_results[-1] suite_name = (test_name.split('#')[0] if '#' in test_name else test_name.split('.')[0]) suite_row = suite_row_dict[suite_name] suite_row[ALL_COUNT_INDEX]['data'] += 1 footer_row[ALL_COUNT_INDEX]['data'] += 1 if result['status'] == 'SUCCESS': suite_row[SUCCESS_COUNT_INDEX]['data'] += 1 footer_row[SUCCESS_COUNT_INDEX]['data'] += 1 elif result['status'] != 'SKIPPED': suite_row[FAIL_COUNT_INDEX]['data'] += 1 footer_row[FAIL_COUNT_INDEX]['data'] += 1 # Some types of crashes can have 'null' values for elapsed_time_ms. if result['elapsed_time_ms'] is not None: suite_row[TIME_INDEX]['data'] += result['elapsed_time_ms'] footer_row[TIME_INDEX]['data'] += result['elapsed_time_ms'] for suite in list(suite_row_dict.values()): if suite[FAIL_COUNT_INDEX]['data'] > 0: suite[FAIL_COUNT_INDEX]['class'] += ' failure' else: suite[FAIL_COUNT_INDEX]['class'] += ' success' if footer_row[FAIL_COUNT_INDEX]['data'] > 0: footer_row[FAIL_COUNT_INDEX]['class'] += ' failure' else: footer_row[FAIL_COUNT_INDEX]['class'] += ' success' return (header_row, [[suite_row] for suite_row in list(suite_row_dict.values())], footer_row) def feedback_url(result_details_link): url_args = [ ('labels', 'Pri-2,Type-Bug,Restrict-View-Google'), ('summary', 'Result Details Feedback:'), ('components', 'Test>Android'), ] if result_details_link: url_args.append(('comment', 'Please check out: %s' % result_details_link)) url_args = urlencode(url_args) return 'https://bugs.chromium.org/p/chromium/issues/entry?%s' % url_args def results_to_html(results_dict, cs_base_url, bucket, test_name, builder_name, build_number, local_output): """Convert list of test results into html format. Args: local_output: Whether this results file is uploaded to Google Storage or just a local file. """ test_rows_header, test_rows = create_test_table( results_dict, cs_base_url, test_name, bucket) suite_rows_header, suite_rows, suite_row_footer = create_suite_table( results_dict) suite_table_values = { 'table_id': 'suite-table', 'table_headers': suite_rows_header, 'table_row_blocks': suite_rows, 'table_footer': suite_row_footer, } test_table_values = { 'table_id': 'test-table', 'table_headers': test_rows_header, 'table_row_blocks': test_rows, } main_template = JINJA_ENVIRONMENT.get_template( os.path.join('template', 'main.html')) if local_output: html_render = main_template.render( # pylint: disable=no-member { 'tb_values': [suite_table_values, test_table_values], 'feedback_url': feedback_url(None), }) return (html_render, None, None) dest = google_storage_helper.unique_name( '%s_%s_%s' % (test_name, builder_name, build_number)) result_details_link = google_storage_helper.get_url_link( dest, '%s/html' % bucket) html_render = main_template.render( # pylint: disable=no-member { 'tb_values': [suite_table_values, test_table_values], 'feedback_url': feedback_url(result_details_link), }) return (html_render, dest, result_details_link) def result_details(json_path, test_name, cs_base_url, bucket=None, builder_name=None, build_number=None, local_output=False): """Get result details from json path and then convert results to html. Args: local_output: Whether this results file is uploaded to Google Storage or just a local file. """ with open(json_path) as json_file: json_object = json.loads(json_file.read()) if not 'per_iteration_data' in json_object: return 'Error: json file missing per_iteration_data.' results_dict = collections.defaultdict(list) for testsuite_run in json_object['per_iteration_data']: for test, test_runs in testsuite_run.items(): results_dict[test].extend(test_runs) return results_to_html(results_dict, cs_base_url, bucket, test_name, builder_name, build_number, local_output) def upload_to_google_bucket(html, bucket, dest): with tempfile.NamedTemporaryFile(suffix='.html') as temp_file: temp_file.write(html) temp_file.flush() return google_storage_helper.upload( name=dest, filepath=temp_file.name, bucket='%s/html' % bucket, content_type='text/html', authenticated_link=True) def ui_screenshot_set(json_path): with open(json_path) as json_file: json_object = json.loads(json_file.read()) if not 'per_iteration_data' in json_object: # This will be reported as an error by result_details, no need to duplicate. return None ui_screenshots = [] # pylint: disable=too-many-nested-blocks for testsuite_run in json_object['per_iteration_data']: for _, test_runs in testsuite_run.items(): for test_run in test_runs: if 'ui screenshot' in test_run['links']: screenshot_link = test_run['links']['ui screenshot'] if screenshot_link.startswith('file:'): with contextlib.closing(urlopen(screenshot_link)) as f: test_screenshots = json.load(f) else: # Assume anything that isn't a file link is a google storage link screenshot_string = google_storage_helper.read_from_link( screenshot_link) if not screenshot_string: logging.error('Bad screenshot link %s', screenshot_link) continue test_screenshots = json.loads( screenshot_string) ui_screenshots.extend(test_screenshots) # pylint: enable=too-many-nested-blocks if ui_screenshots: return json.dumps(ui_screenshots) return None def upload_screenshot_set(json_path, test_name, bucket, builder_name, build_number): screenshot_set = ui_screenshot_set(json_path) if not screenshot_set: return None dest = google_storage_helper.unique_name( 'screenshots_%s_%s_%s' % (test_name, builder_name, build_number), suffix='.json') with tempfile.NamedTemporaryFile(mode='w', suffix='.json') as temp_file: temp_file.write(screenshot_set) temp_file.flush() return google_storage_helper.upload( name=dest, filepath=temp_file.name, bucket='%s/json' % bucket, content_type='application/json', authenticated_link=True) def main(): parser = argparse.ArgumentParser() parser.add_argument('--json-file', help='Path of json file.') parser.add_argument('--cs-base-url', help='Base url for code search.', default='http://cs.chromium.org') parser.add_argument('--bucket', help='Google storage bucket.', required=True) parser.add_argument('--builder-name', help='Builder name.') parser.add_argument('--build-number', help='Build number.') parser.add_argument('--test-name', help='The name of the test.', required=True) parser.add_argument( '-o', '--output-json', help='(Swarming Merge Script API) ' 'Output JSON file to create.') parser.add_argument( '--build-properties', help='(Swarming Merge Script API) ' 'Build property JSON file provided by recipes.') parser.add_argument( '--summary-json', help='(Swarming Merge Script API) ' 'Summary of shard state running on swarming. ' '(Output of the swarming.py collect ' '--task-summary-json=XXX command.)') parser.add_argument( '--task-output-dir', help='(Swarming Merge Script API) ' 'Directory containing all swarming task results.') parser.add_argument( 'positional', nargs='*', help='output.json from shards.') args = parser.parse_args() if ((args.build_properties is None) == (args.build_number is None or args.builder_name is None)): raise parser.error('Exactly one of build_perperties or ' '(build_number or builder_name) should be given.') if (args.build_number is None) != (args.builder_name is None): raise parser.error('args.build_number and args.builder_name ' 'has to be be given together' 'or not given at all.') if len(args.positional) == 0 and args.json_file is None: if args.output_json: with open(args.output_json, 'w') as f: json.dump({}, f) return if len(args.positional) != 0 and args.json_file: raise parser.error('Exactly one of args.positional and ' 'args.json_file should be given.') if args.build_properties: build_properties = json.loads(args.build_properties) if ((not 'buildnumber' in build_properties) or (not 'buildername' in build_properties)): raise parser.error('Build number/builder name not specified.') build_number = build_properties['buildnumber'] builder_name = build_properties['buildername'] elif args.build_number and args.builder_name: build_number = args.build_number builder_name = args.builder_name if args.positional: if len(args.positional) == 1: json_file = args.positional[0] else: if args.output_json and args.summary_json: standard_gtest_merge.standard_gtest_merge( args.output_json, args.summary_json, args.positional) json_file = args.output_json elif not args.output_json: raise Exception('output_json required by merge API is missing.') else: raise Exception('summary_json required by merge API is missing.') elif args.json_file: json_file = args.json_file if not os.path.exists(json_file): raise IOError('--json-file %s not found.' % json_file) # Link to result details presentation page is a part of the page. result_html_string, dest, result_details_link = result_details( json_file, args.test_name, args.cs_base_url, args.bucket, builder_name, build_number) result_details_link_2 = upload_to_google_bucket( result_html_string.encode('UTF-8'), args.bucket, dest) assert result_details_link == result_details_link_2, ( 'Result details link do not match. The link returned by get_url_link' ' should be the same as that returned by upload.') ui_screenshot_set_link = upload_screenshot_set(json_file, args.test_name, args.bucket, builder_name, build_number) if ui_screenshot_set_link: ui_catalog_url = 'https://chrome-ui-catalog.appspot.com/' ui_catalog_query = urlencode({'screenshot_source': ui_screenshot_set_link}) ui_screenshot_link = '%s?%s' % (ui_catalog_url, ui_catalog_query) if args.output_json: with open(json_file) as original_json_file: json_object = json.load(original_json_file) json_object['links'] = { 'result_details (logcats, flakiness links)': result_details_link } if ui_screenshot_set_link: json_object['links']['ui screenshots'] = ui_screenshot_link with open(args.output_json, 'w') as f: json.dump(json_object, f) else: print('Result Details: %s' % result_details_link) if ui_screenshot_set_link: print('UI Screenshots %s' % ui_screenshot_link) if __name__ == '__main__': sys.exit(main())