1#!/usr/bin/env python3 2 3# Copyright (C) 2024 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""CLI uploader for Mobly test results to Resultstore.""" 18 19import argparse 20import collections 21import dataclasses 22import datetime 23from importlib import resources 24import logging 25import mimetypes 26import pathlib 27import platform 28import shutil 29import subprocess 30import tempfile 31import warnings 32from xml.etree import ElementTree 33 34import google.auth 35from google.cloud import api_keys_v2 36from google.cloud import resourcemanager_v3 37from google.cloud import storage 38from googleapiclient import discovery 39 40import mobly_result_converter 41import resultstore_client 42 43with warnings.catch_warnings(): 44 warnings.simplefilter('ignore') 45 from google.cloud.storage import transfer_manager 46 47 48_RESULTSTORE_SERVICE_NAME = 'resultstore' 49_API_VERSION = 'v2' 50_API_KEY_DISPLAY_NAME = 'resultstore' 51_DISCOVERY_SERVICE_URL = ( 52 'https://{api}.googleapis.com/$discovery/rest?version={apiVersion}' 53) 54 55_TEST_XML = 'test.xml' 56_TEST_LOG = 'test.log' 57_UNDECLARED_OUTPUTS = 'undeclared_outputs' 58 59_TEST_SUMMARY_YAML = 'test_summary.yaml' 60_TEST_LOG_INFO = 'test_log.INFO' 61 62_SUITE_NAME = 'suite_name' 63_RUN_IDENTIFIER = 'run_identifier' 64 65_GCS_BASE_LINK = 'https://console.cloud.google.com/storage/browser' 66_GCS_DEFAULT_TIMEOUT_SECS = 300 67 68_ResultstoreTreeTags = mobly_result_converter.ResultstoreTreeTags 69_ResultstoreTreeAttributes = mobly_result_converter.ResultstoreTreeAttributes 70 71_Status = resultstore_client.Status 72 73 74@dataclasses.dataclass() 75class _TestResultInfo: 76 """Info from the parsed test summary used for the Resultstore invocation.""" 77 78 # Aggregate status of the overall test run. 79 status: _Status = _Status.UNKNOWN 80 # Target ID for the test. 81 target_id: str | None = None 82 83 84def _setup_logging(verbose: bool) -> None: 85 """Configures the logging for this module.""" 86 debug_log_path = tempfile.mkstemp('_upload_log.txt')[1] 87 file_handler = logging.FileHandler(debug_log_path) 88 file_handler.setLevel(logging.DEBUG) 89 file_handler.setFormatter(logging.Formatter( 90 '%(asctime)s %(levelname)s [%(module)s.%(funcName)s] %(message)s' 91 )) 92 stream_handler = logging.StreamHandler() 93 stream_handler.setLevel(logging.DEBUG if verbose else logging.INFO) 94 stream_handler.setFormatter( 95 logging.Formatter('%(levelname)s: %(message)s')) 96 logging.basicConfig( 97 level=logging.DEBUG, 98 handlers=(file_handler, stream_handler) 99 ) 100 101 logging.getLogger('googleapiclient').setLevel(logging.WARNING) 102 logging.getLogger('google.auth').setLevel(logging.ERROR) 103 logging.info('Debug logs are saved to %s', debug_log_path) 104 print('-' * 50) 105 106 107def _gcloud_login_and_set_project() -> None: 108 """Get gcloud application default creds and set the desired GCP project.""" 109 logging.info('No credentials found. Performing initial setup.') 110 project_id = '' 111 while not project_id: 112 project_id = input('Enter your GCP project ID: ') 113 try: 114 subprocess.run(['gcloud', 'auth', 'application-default', 'login', 115 '--no-launch-browser']) 116 subprocess.run(['gcloud', 'auth', 'application-default', 117 'set-quota-project', project_id]) 118 except FileNotFoundError: 119 logging.exception( 120 'Failed to run `gcloud` commands. Please install the `gcloud` CLI!') 121 logging.info('Initial setup complete!') 122 print('-' * 50) 123 124 125def _get_project_number(project_id: str) -> str: 126 """Get the project number associated with a GCP project ID.""" 127 client = resourcemanager_v3.ProjectsClient() 128 response = client.get_project(name=f'projects/{project_id}') 129 return response.name.split('/', 1)[1] 130 131 132def _retrieve_api_key(project_id: str) -> str | None: 133 """Downloads the Resultstore API key for the given Google Cloud project.""" 134 project_number = _get_project_number(project_id) 135 client = api_keys_v2.ApiKeysClient() 136 keys = client.list_keys( 137 parent=f'projects/{project_number}/locations/global' 138 ).keys 139 for key in keys: 140 if key.display_name == _API_KEY_DISPLAY_NAME: 141 return client.get_key_string(name=key.name).key_string 142 return None 143 144 145def _convert_results( 146 mobly_dir: pathlib.Path, dest_dir: pathlib.Path) -> _TestResultInfo: 147 """Converts Mobly test results into Resultstore test.xml and test.log.""" 148 test_result_info = _TestResultInfo() 149 logging.info('Converting raw Mobly logs into Resultstore artifacts...') 150 # Generate the test.xml 151 mobly_yaml_path = mobly_dir.joinpath(_TEST_SUMMARY_YAML) 152 if mobly_yaml_path.is_file(): 153 test_xml = mobly_result_converter.convert(mobly_yaml_path, mobly_dir) 154 ElementTree.indent(test_xml) 155 test_xml.write( 156 str(dest_dir.joinpath(_TEST_XML)), 157 encoding='utf-8', 158 xml_declaration=True, 159 ) 160 test_result_info = _get_test_result_info_from_test_xml(test_xml) 161 162 # Copy test_log.INFO to test.log 163 test_log_info = mobly_dir.joinpath(_TEST_LOG_INFO) 164 if test_log_info.is_file(): 165 shutil.copyfile(test_log_info, dest_dir.joinpath(_TEST_LOG)) 166 167 return test_result_info 168 169 170def _aggregate_testcase_iteration_results( 171 iteration_results: list[str]): 172 """Determines the aggregate result from a list of test case iterations. 173 174 This is only applicable to test cases with repeat/retry. 175 """ 176 iterations_failed = [ 177 result == _Status.FAILED for result in iteration_results 178 if result != _Status.SKIPPED 179 ] 180 # Skip if all iterations skipped 181 if not iterations_failed: 182 return _Status.SKIPPED 183 # Fail if all iterations failed 184 if all(iterations_failed): 185 return _Status.FAILED 186 # Flaky if some iterations failed 187 if any(iterations_failed): 188 return _Status.FLAKY 189 # Pass otherwise 190 return _Status.PASSED 191 192 193def _aggregate_subtest_results(subtest_results: list[str]): 194 """Determines the aggregate result from a list of subtest nodes. 195 196 This is used to provide a test class result based on the test cases, or 197 a test suite result based on the test classes. 198 """ 199 # Skip if all subtests skipped 200 if all([result == _Status.SKIPPED for result in subtest_results]): 201 return _Status.SKIPPED 202 203 any_flaky = False 204 for result in subtest_results: 205 # Fail if any subtest failed 206 if result == _Status.FAILED: 207 return _Status.FAILED 208 # Record flaky subtest 209 if result == _Status.FLAKY: 210 any_flaky = True 211 # Flaky if any subtest is flaky, pass otherwise 212 return _Status.FLAKY if any_flaky else _Status.PASSED 213 214 215def _get_test_status_from_xml(mobly_suite_element: ElementTree.Element): 216 """Gets the overall status from the test XML.""" 217 test_class_elements = mobly_suite_element.findall( 218 f'./{_ResultstoreTreeTags.TESTSUITE.value}') 219 test_class_results = [] 220 for test_class_element in test_class_elements: 221 test_case_results = [] 222 test_case_iteration_results = collections.defaultdict(list) 223 test_case_elements = test_class_element.findall( 224 f'./{_ResultstoreTreeTags.TESTCASE.value}') 225 for test_case_element in test_case_elements: 226 result = _Status.PASSED 227 if test_case_element.get( 228 _ResultstoreTreeAttributes.RESULT.value) == 'skipped': 229 result = _Status.SKIPPED 230 if ( 231 test_case_element.find( 232 f'./{_ResultstoreTreeTags.FAILURE.value}') is not None 233 or test_case_element.find( 234 f'./{_ResultstoreTreeTags.ERROR.value}') is not None 235 ): 236 result = _Status.FAILED 237 # Add to iteration results if run as part of a repeat/retry 238 # Otherwise, add to test case results directly 239 if ( 240 test_case_element.get( 241 _ResultstoreTreeAttributes.RETRY_NUMBER.value) or 242 test_case_element.get( 243 _ResultstoreTreeAttributes.REPEAT_NUMBER.value) 244 ): 245 test_case_iteration_results[ 246 test_case_element.get(_ResultstoreTreeAttributes.NAME.value) 247 ].append(result) 248 else: 249 test_case_results.append(result) 250 251 for iteration_result_list in test_case_iteration_results.values(): 252 test_case_results.append( 253 _aggregate_testcase_iteration_results(iteration_result_list) 254 ) 255 test_class_results.append( 256 _aggregate_subtest_results(test_case_results) 257 ) 258 return _aggregate_subtest_results(test_class_results) 259 260 261def _get_test_result_info_from_test_xml( 262 test_xml: ElementTree.ElementTree, 263) -> _TestResultInfo: 264 """Parses a test_xml element into a _TestResultInfo.""" 265 test_result_info = _TestResultInfo() 266 mobly_suite_element = test_xml.getroot().find( 267 f'./{_ResultstoreTreeTags.TESTSUITE.value}' 268 ) 269 if mobly_suite_element is None: 270 return test_result_info 271 # Set aggregate test status 272 test_result_info.status = _get_test_status_from_xml(mobly_suite_element) 273 274 # Set target ID based on test class names, suite name, and custom run 275 # identifier. 276 suite_name_value = None 277 run_identifier_value = None 278 properties_element = mobly_suite_element.find( 279 f'./{_ResultstoreTreeTags.PROPERTIES.value}' 280 ) 281 if properties_element is not None: 282 suite_name = properties_element.find( 283 f'./{_ResultstoreTreeTags.PROPERTY.value}' 284 f'[@{_ResultstoreTreeAttributes.NAME.value}="{_SUITE_NAME}"]' 285 ) 286 if suite_name is not None: 287 suite_name_value = suite_name.get( 288 _ResultstoreTreeAttributes.VALUE.value 289 ) 290 run_identifier = properties_element.find( 291 f'./{_ResultstoreTreeTags.PROPERTY.value}' 292 f'[@{_ResultstoreTreeAttributes.NAME.value}="{_RUN_IDENTIFIER}"]' 293 ) 294 if run_identifier is not None: 295 run_identifier_value = run_identifier.get( 296 _ResultstoreTreeAttributes.VALUE.value 297 ) 298 if suite_name_value: 299 target_id = suite_name_value 300 else: 301 test_class_elements = mobly_suite_element.findall( 302 f'./{_ResultstoreTreeTags.TESTSUITE.value}') 303 test_class_names = [ 304 test_class_element.get(_ResultstoreTreeAttributes.NAME.value) 305 for test_class_element in test_class_elements 306 ] 307 target_id = '+'.join(test_class_names) 308 if run_identifier_value: 309 target_id = f'{target_id} {run_identifier_value}' 310 311 test_result_info.target_id = target_id 312 return test_result_info 313 314 315def _upload_dir_to_gcs( 316 src_dir: pathlib.Path, gcs_bucket: str, gcs_dir: str, timeout: int 317) -> list[str]: 318 """Uploads the given directory to a GCS bucket.""" 319 # Set correct MIME types for certain text-format files. 320 with resources.as_file( 321 resources.files('data').joinpath('mime.types')) as path: 322 mimetypes.init([path]) 323 324 bucket_obj = storage.Client().bucket(gcs_bucket) 325 326 glob = src_dir.rglob('*') 327 file_paths = [ 328 str(path.relative_to(src_dir).as_posix()) 329 for path in glob 330 if path.is_file() 331 ] 332 333 logging.info( 334 'Uploading %s files from %s to Cloud Storage directory %s/%s...', 335 len(file_paths), 336 str(src_dir), 337 gcs_bucket, 338 gcs_dir, 339 ) 340 # Ensure that the destination directory has a trailing '/'. 341 blob_name_prefix = gcs_dir 342 if blob_name_prefix and not blob_name_prefix.endswith('/'): 343 blob_name_prefix += '/' 344 345 # If running on Windows, disable multiprocessing for upload. 346 worker_type = ( 347 transfer_manager.THREAD 348 if platform.system() == 'Windows' 349 else transfer_manager.PROCESS 350 ) 351 results = transfer_manager.upload_many_from_filenames( 352 bucket_obj, 353 file_paths, 354 source_directory=str(src_dir), 355 blob_name_prefix=blob_name_prefix, 356 skip_if_exists=True, 357 worker_type=worker_type, 358 upload_kwargs={'timeout': timeout}, 359 ) 360 361 success_paths = [] 362 for file_path, result in zip(file_paths, results): 363 if isinstance(result, Exception): 364 logging.warning('Failed to upload %s. Error: %s', file_path, result) 365 else: 366 logging.debug('Uploaded %s.', file_path) 367 success_paths.append(file_path) 368 369 return [f'{gcs_dir}/{path}' for path in success_paths] 370 371 372def _upload_to_resultstore( 373 api_key: str, 374 gcs_bucket: str, 375 gcs_base_dir: str, 376 file_paths: list[str], 377 status: _Status, 378 target_id: str | None, 379 labels: list[str], 380) -> None: 381 """Uploads test results to Resultstore.""" 382 logging.info('Generating Resultstore link...') 383 creds, project_id = google.auth.default() 384 service = discovery.build( 385 _RESULTSTORE_SERVICE_NAME, 386 _API_VERSION, 387 discoveryServiceUrl=_DISCOVERY_SERVICE_URL, 388 developerKey=api_key, 389 ) 390 client = resultstore_client.ResultstoreClient(service, creds, project_id) 391 client.create_invocation(labels) 392 client.create_default_configuration() 393 client.create_target(target_id) 394 client.create_configured_target() 395 client.create_action(gcs_bucket, gcs_base_dir, file_paths) 396 client.set_status(status) 397 client.merge_configured_target() 398 client.finalize_configured_target() 399 client.merge_target() 400 client.finalize_target() 401 client.merge_invocation() 402 client.finalize_invocation() 403 404 405def main(): 406 parser = argparse.ArgumentParser() 407 parser.add_argument( 408 '-v', '--verbose', action='store_true', help='Enable debug logs.' 409 ) 410 parser.add_argument( 411 'mobly_dir', 412 help='Directory on host where Mobly results are stored.', 413 ) 414 parser.add_argument( 415 '--gcs_bucket', 416 help='Bucket in GCS where test artifacts are uploaded. If unspecified, ' 417 'use the current GCP project name as the bucket name.', 418 ) 419 parser.add_argument( 420 '--gcs_dir', 421 help=( 422 'Directory to save test artifacts in GCS. If unspecified or empty, ' 423 'use the current timestamp as the GCS directory name.' 424 ), 425 ) 426 parser.add_argument( 427 '--gcs_upload_timeout', 428 type=int, 429 default=_GCS_DEFAULT_TIMEOUT_SECS, 430 help=( 431 'Timeout (in seconds) to upload each file to GCS. ' 432 f'Default: {_GCS_DEFAULT_TIMEOUT_SECS} seconds.'), 433 ) 434 parser.add_argument( 435 '--test_title', 436 help='Custom test title to display in the result UI.' 437 ) 438 parser.add_argument( 439 '--label', 440 action='append', 441 help='Label to attach to the uploaded result. Can be repeated for ' 442 'multiple labels.' 443 ) 444 parser.add_argument( 445 '--no_convert_result', 446 action='store_true', 447 help=( 448 'Upload the files as is, without first converting Mobly results to ' 449 'Resultstore\'s format. The source directory must contain at least ' 450 'a `test.xml` file, and an `undeclared_outputs` zip or ' 451 'subdirectory.') 452 ) 453 args = parser.parse_args() 454 _setup_logging(args.verbose) 455 try: 456 _, project_id = google.auth.default() 457 except google.auth.exceptions.DefaultCredentialsError: 458 _gcloud_login_and_set_project() 459 _, project_id = google.auth.default() 460 logging.info('Current GCP project ID: %s', project_id) 461 api_key = _retrieve_api_key(project_id) 462 if api_key is None: 463 logging.error( 464 'No API key with name [%s] found for project [%s]. Contact the ' 465 'project owner to create the required key.', 466 _API_KEY_DISPLAY_NAME, project_id 467 ) 468 return 469 gcs_bucket = project_id if args.gcs_bucket is None else args.gcs_bucket 470 gcs_base_dir = pathlib.PurePath( 471 datetime.datetime.now().strftime('%Y%m%d-%H%M%S') 472 if not args.gcs_dir 473 else args.gcs_dir 474 ) 475 mobly_dir = pathlib.Path(args.mobly_dir).absolute().expanduser() 476 477 if args.no_convert_result: 478 # Determine the final status based on the test.xml 479 test_xml = ElementTree.parse(mobly_dir.joinpath(_TEST_XML)) 480 test_result_info = _get_test_result_info_from_test_xml(test_xml) 481 # Upload the contents of mobly_dir directly 482 gcs_files = _upload_dir_to_gcs( 483 mobly_dir, gcs_bucket, gcs_base_dir.as_posix(), 484 args.gcs_upload_timeout 485 ) 486 else: 487 # Generate and upload test.xml and test.log 488 with tempfile.TemporaryDirectory() as tmp: 489 converted_dir = pathlib.Path(tmp).joinpath(gcs_base_dir) 490 converted_dir.mkdir(parents=True) 491 test_result_info = _convert_results(mobly_dir, converted_dir) 492 gcs_files = _upload_dir_to_gcs( 493 converted_dir, gcs_bucket, gcs_base_dir.as_posix(), 494 args.gcs_upload_timeout 495 ) 496 # Upload raw Mobly logs to undeclared_outputs/ subdirectory 497 gcs_files += _upload_dir_to_gcs( 498 mobly_dir, gcs_bucket, 499 gcs_base_dir.joinpath(_UNDECLARED_OUTPUTS).as_posix(), 500 args.gcs_upload_timeout 501 ) 502 _upload_to_resultstore( 503 api_key, 504 gcs_bucket, 505 gcs_base_dir.as_posix(), 506 gcs_files, 507 test_result_info.status, 508 args.test_title or test_result_info.target_id, 509 args.label 510 ) 511 512 513if __name__ == '__main__': 514 main() 515