• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2
3#  Copyright (C) 2024 The Android Open Source Project
4#
5#  Licensed under the Apache License, Version 2.0 (the "License");
6#  you may not use this file except in compliance with the License.
7#  You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#  Unless required by applicable law or agreed to in writing, software
12#  distributed under the License is distributed on an "AS IS" BASIS,
13#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#  See the License for the specific language governing permissions and
15#  limitations under the License.
16
17"""CLI uploader for Mobly test results to Resultstore."""
18
19import argparse
20import collections
21import dataclasses
22import datetime
23from importlib import resources
24import logging
25import mimetypes
26import pathlib
27import platform
28import shutil
29import subprocess
30import tempfile
31import warnings
32from xml.etree import ElementTree
33
34import google.auth
35from google.cloud import api_keys_v2
36from google.cloud import resourcemanager_v3
37from google.cloud import storage
38from googleapiclient import discovery
39
40import mobly_result_converter
41import resultstore_client
42
43with warnings.catch_warnings():
44    warnings.simplefilter('ignore')
45    from google.cloud.storage import transfer_manager
46
47
48_RESULTSTORE_SERVICE_NAME = 'resultstore'
49_API_VERSION = 'v2'
50_API_KEY_DISPLAY_NAME = 'resultstore'
51_DISCOVERY_SERVICE_URL = (
52    'https://{api}.googleapis.com/$discovery/rest?version={apiVersion}'
53)
54
55_TEST_XML = 'test.xml'
56_TEST_LOG = 'test.log'
57_UNDECLARED_OUTPUTS = 'undeclared_outputs'
58
59_TEST_SUMMARY_YAML = 'test_summary.yaml'
60_TEST_LOG_INFO = 'test_log.INFO'
61
62_SUITE_NAME = 'suite_name'
63_RUN_IDENTIFIER = 'run_identifier'
64
65_GCS_BASE_LINK = 'https://console.cloud.google.com/storage/browser'
66_GCS_DEFAULT_TIMEOUT_SECS = 300
67
68_ResultstoreTreeTags = mobly_result_converter.ResultstoreTreeTags
69_ResultstoreTreeAttributes = mobly_result_converter.ResultstoreTreeAttributes
70
71_Status = resultstore_client.Status
72
73
74@dataclasses.dataclass()
75class _TestResultInfo:
76    """Info from the parsed test summary used for the Resultstore invocation."""
77
78    # Aggregate status of the overall test run.
79    status: _Status = _Status.UNKNOWN
80    # Target ID for the test.
81    target_id: str | None = None
82
83
84def _setup_logging(verbose: bool) -> None:
85    """Configures the logging for this module."""
86    debug_log_path = tempfile.mkstemp('_upload_log.txt')[1]
87    file_handler = logging.FileHandler(debug_log_path)
88    file_handler.setLevel(logging.DEBUG)
89    file_handler.setFormatter(logging.Formatter(
90        '%(asctime)s %(levelname)s [%(module)s.%(funcName)s] %(message)s'
91    ))
92    stream_handler = logging.StreamHandler()
93    stream_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
94    stream_handler.setFormatter(
95        logging.Formatter('%(levelname)s: %(message)s'))
96    logging.basicConfig(
97        level=logging.DEBUG,
98        handlers=(file_handler, stream_handler)
99    )
100
101    logging.getLogger('googleapiclient').setLevel(logging.WARNING)
102    logging.getLogger('google.auth').setLevel(logging.ERROR)
103    logging.info('Debug logs are saved to %s', debug_log_path)
104    print('-' * 50)
105
106
107def _gcloud_login_and_set_project() -> None:
108    """Get gcloud application default creds and set the desired GCP project."""
109    logging.info('No credentials found. Performing initial setup.')
110    project_id = ''
111    while not project_id:
112        project_id = input('Enter your GCP project ID: ')
113    try:
114        subprocess.run(['gcloud', 'auth', 'application-default', 'login',
115                        '--no-launch-browser'])
116        subprocess.run(['gcloud', 'auth', 'application-default',
117                        'set-quota-project', project_id])
118    except FileNotFoundError:
119        logging.exception(
120            'Failed to run `gcloud` commands. Please install the `gcloud` CLI!')
121    logging.info('Initial setup complete!')
122    print('-' * 50)
123
124
125def _get_project_number(project_id: str) -> str:
126    """Get the project number associated with a GCP project ID."""
127    client = resourcemanager_v3.ProjectsClient()
128    response = client.get_project(name=f'projects/{project_id}')
129    return response.name.split('/', 1)[1]
130
131
132def _retrieve_api_key(project_id: str) -> str | None:
133    """Downloads the Resultstore API key for the given Google Cloud project."""
134    project_number = _get_project_number(project_id)
135    client = api_keys_v2.ApiKeysClient()
136    keys = client.list_keys(
137        parent=f'projects/{project_number}/locations/global'
138    ).keys
139    for key in keys:
140        if key.display_name == _API_KEY_DISPLAY_NAME:
141            return client.get_key_string(name=key.name).key_string
142    return None
143
144
145def _convert_results(
146        mobly_dir: pathlib.Path, dest_dir: pathlib.Path) -> _TestResultInfo:
147    """Converts Mobly test results into Resultstore test.xml and test.log."""
148    test_result_info = _TestResultInfo()
149    logging.info('Converting raw Mobly logs into Resultstore artifacts...')
150    # Generate the test.xml
151    mobly_yaml_path = mobly_dir.joinpath(_TEST_SUMMARY_YAML)
152    if mobly_yaml_path.is_file():
153        test_xml = mobly_result_converter.convert(mobly_yaml_path, mobly_dir)
154        ElementTree.indent(test_xml)
155        test_xml.write(
156            str(dest_dir.joinpath(_TEST_XML)),
157            encoding='utf-8',
158            xml_declaration=True,
159        )
160        test_result_info = _get_test_result_info_from_test_xml(test_xml)
161
162    # Copy test_log.INFO to test.log
163    test_log_info = mobly_dir.joinpath(_TEST_LOG_INFO)
164    if test_log_info.is_file():
165        shutil.copyfile(test_log_info, dest_dir.joinpath(_TEST_LOG))
166
167    return test_result_info
168
169
170def _aggregate_testcase_iteration_results(
171        iteration_results: list[str]):
172    """Determines the aggregate result from a list of test case iterations.
173
174    This is only applicable to test cases with repeat/retry.
175    """
176    iterations_failed = [
177        result == _Status.FAILED for result in iteration_results
178        if result != _Status.SKIPPED
179    ]
180    # Skip if all iterations skipped
181    if not iterations_failed:
182        return _Status.SKIPPED
183    # Fail if all iterations failed
184    if all(iterations_failed):
185        return _Status.FAILED
186    # Flaky if some iterations failed
187    if any(iterations_failed):
188        return _Status.FLAKY
189    # Pass otherwise
190    return _Status.PASSED
191
192
193def _aggregate_subtest_results(subtest_results: list[str]):
194    """Determines the aggregate result from a list of subtest nodes.
195
196    This is used to provide a test class result based on the test cases, or
197    a test suite result based on the test classes.
198    """
199    # Skip if all subtests skipped
200    if all([result == _Status.SKIPPED for result in subtest_results]):
201        return _Status.SKIPPED
202
203    any_flaky = False
204    for result in subtest_results:
205        # Fail if any subtest failed
206        if result == _Status.FAILED:
207            return _Status.FAILED
208        # Record flaky subtest
209        if result == _Status.FLAKY:
210            any_flaky = True
211    # Flaky if any subtest is flaky, pass otherwise
212    return _Status.FLAKY if any_flaky else _Status.PASSED
213
214
215def _get_test_status_from_xml(mobly_suite_element: ElementTree.Element):
216    """Gets the overall status from the test XML."""
217    test_class_elements = mobly_suite_element.findall(
218        f'./{_ResultstoreTreeTags.TESTSUITE.value}')
219    test_class_results = []
220    for test_class_element in test_class_elements:
221        test_case_results = []
222        test_case_iteration_results = collections.defaultdict(list)
223        test_case_elements = test_class_element.findall(
224            f'./{_ResultstoreTreeTags.TESTCASE.value}')
225        for test_case_element in test_case_elements:
226            result = _Status.PASSED
227            if test_case_element.get(
228                    _ResultstoreTreeAttributes.RESULT.value) == 'skipped':
229                result = _Status.SKIPPED
230            if (
231                    test_case_element.find(
232                        f'./{_ResultstoreTreeTags.FAILURE.value}') is not None
233                    or test_case_element.find(
234                        f'./{_ResultstoreTreeTags.ERROR.value}') is not None
235            ):
236                result = _Status.FAILED
237            # Add to iteration results if run as part of a repeat/retry
238            # Otherwise, add to test case results directly
239            if (
240                    test_case_element.get(
241                        _ResultstoreTreeAttributes.RETRY_NUMBER.value) or
242                    test_case_element.get(
243                        _ResultstoreTreeAttributes.REPEAT_NUMBER.value)
244            ):
245                test_case_iteration_results[
246                    test_case_element.get(_ResultstoreTreeAttributes.NAME.value)
247                ].append(result)
248            else:
249                test_case_results.append(result)
250
251        for iteration_result_list in test_case_iteration_results.values():
252            test_case_results.append(
253                _aggregate_testcase_iteration_results(iteration_result_list)
254            )
255        test_class_results.append(
256            _aggregate_subtest_results(test_case_results)
257        )
258    return _aggregate_subtest_results(test_class_results)
259
260
261def _get_test_result_info_from_test_xml(
262        test_xml: ElementTree.ElementTree,
263) -> _TestResultInfo:
264    """Parses a test_xml element into a _TestResultInfo."""
265    test_result_info = _TestResultInfo()
266    mobly_suite_element = test_xml.getroot().find(
267        f'./{_ResultstoreTreeTags.TESTSUITE.value}'
268    )
269    if mobly_suite_element is None:
270        return test_result_info
271    # Set aggregate test status
272    test_result_info.status = _get_test_status_from_xml(mobly_suite_element)
273
274    # Set target ID based on test class names, suite name, and custom run
275    # identifier.
276    suite_name_value = None
277    run_identifier_value = None
278    properties_element = mobly_suite_element.find(
279        f'./{_ResultstoreTreeTags.PROPERTIES.value}'
280    )
281    if properties_element is not None:
282        suite_name = properties_element.find(
283            f'./{_ResultstoreTreeTags.PROPERTY.value}'
284            f'[@{_ResultstoreTreeAttributes.NAME.value}="{_SUITE_NAME}"]'
285        )
286        if suite_name is not None:
287            suite_name_value = suite_name.get(
288                _ResultstoreTreeAttributes.VALUE.value
289            )
290        run_identifier = properties_element.find(
291            f'./{_ResultstoreTreeTags.PROPERTY.value}'
292            f'[@{_ResultstoreTreeAttributes.NAME.value}="{_RUN_IDENTIFIER}"]'
293        )
294        if run_identifier is not None:
295            run_identifier_value = run_identifier.get(
296                _ResultstoreTreeAttributes.VALUE.value
297            )
298    if suite_name_value:
299        target_id = suite_name_value
300    else:
301        test_class_elements = mobly_suite_element.findall(
302            f'./{_ResultstoreTreeTags.TESTSUITE.value}')
303        test_class_names = [
304            test_class_element.get(_ResultstoreTreeAttributes.NAME.value)
305            for test_class_element in test_class_elements
306        ]
307        target_id = '+'.join(test_class_names)
308    if run_identifier_value:
309        target_id = f'{target_id} {run_identifier_value}'
310
311    test_result_info.target_id = target_id
312    return test_result_info
313
314
315def _upload_dir_to_gcs(
316        src_dir: pathlib.Path, gcs_bucket: str, gcs_dir: str, timeout: int
317) -> list[str]:
318    """Uploads the given directory to a GCS bucket."""
319    # Set correct MIME types for certain text-format files.
320    with resources.as_file(
321            resources.files('data').joinpath('mime.types')) as path:
322        mimetypes.init([path])
323
324    bucket_obj = storage.Client().bucket(gcs_bucket)
325
326    glob = src_dir.rglob('*')
327    file_paths = [
328        str(path.relative_to(src_dir).as_posix())
329        for path in glob
330        if path.is_file()
331    ]
332
333    logging.info(
334        'Uploading %s files from %s to Cloud Storage directory %s/%s...',
335        len(file_paths),
336        str(src_dir),
337        gcs_bucket,
338        gcs_dir,
339    )
340    # Ensure that the destination directory has a trailing '/'.
341    blob_name_prefix = gcs_dir
342    if blob_name_prefix and not blob_name_prefix.endswith('/'):
343        blob_name_prefix += '/'
344
345    # If running on Windows, disable multiprocessing for upload.
346    worker_type = (
347        transfer_manager.THREAD
348        if platform.system() == 'Windows'
349        else transfer_manager.PROCESS
350    )
351    results = transfer_manager.upload_many_from_filenames(
352        bucket_obj,
353        file_paths,
354        source_directory=str(src_dir),
355        blob_name_prefix=blob_name_prefix,
356        skip_if_exists=True,
357        worker_type=worker_type,
358        upload_kwargs={'timeout': timeout},
359    )
360
361    success_paths = []
362    for file_path, result in zip(file_paths, results):
363        if isinstance(result, Exception):
364            logging.warning('Failed to upload %s. Error: %s', file_path, result)
365        else:
366            logging.debug('Uploaded %s.', file_path)
367            success_paths.append(file_path)
368
369    return [f'{gcs_dir}/{path}' for path in success_paths]
370
371
372def _upload_to_resultstore(
373        api_key: str,
374        gcs_bucket: str,
375        gcs_base_dir: str,
376        file_paths: list[str],
377        status: _Status,
378        target_id: str | None,
379        labels: list[str],
380) -> None:
381    """Uploads test results to Resultstore."""
382    logging.info('Generating Resultstore link...')
383    creds, project_id = google.auth.default()
384    service = discovery.build(
385        _RESULTSTORE_SERVICE_NAME,
386        _API_VERSION,
387        discoveryServiceUrl=_DISCOVERY_SERVICE_URL,
388        developerKey=api_key,
389    )
390    client = resultstore_client.ResultstoreClient(service, creds, project_id)
391    client.create_invocation(labels)
392    client.create_default_configuration()
393    client.create_target(target_id)
394    client.create_configured_target()
395    client.create_action(gcs_bucket, gcs_base_dir, file_paths)
396    client.set_status(status)
397    client.merge_configured_target()
398    client.finalize_configured_target()
399    client.merge_target()
400    client.finalize_target()
401    client.merge_invocation()
402    client.finalize_invocation()
403
404
405def main():
406    parser = argparse.ArgumentParser()
407    parser.add_argument(
408        '-v', '--verbose', action='store_true', help='Enable debug logs.'
409    )
410    parser.add_argument(
411        'mobly_dir',
412        help='Directory on host where Mobly results are stored.',
413    )
414    parser.add_argument(
415        '--gcs_bucket',
416        help='Bucket in GCS where test artifacts are uploaded. If unspecified, '
417             'use the current GCP project name as the bucket name.',
418    )
419    parser.add_argument(
420        '--gcs_dir',
421        help=(
422            'Directory to save test artifacts in GCS. If unspecified or empty, '
423            'use the current timestamp as the GCS directory name.'
424        ),
425    )
426    parser.add_argument(
427        '--gcs_upload_timeout',
428        type=int,
429        default=_GCS_DEFAULT_TIMEOUT_SECS,
430        help=(
431            'Timeout (in seconds) to upload each file to GCS. '
432            f'Default: {_GCS_DEFAULT_TIMEOUT_SECS} seconds.'),
433    )
434    parser.add_argument(
435        '--test_title',
436        help='Custom test title to display in the result UI.'
437    )
438    parser.add_argument(
439        '--label',
440        action='append',
441        help='Label to attach to the uploaded result. Can be repeated for '
442             'multiple labels.'
443    )
444    parser.add_argument(
445        '--no_convert_result',
446        action='store_true',
447        help=(
448            'Upload the files as is, without first converting Mobly results to '
449            'Resultstore\'s format. The source directory must contain at least '
450            'a `test.xml` file, and an `undeclared_outputs` zip or '
451            'subdirectory.')
452    )
453    args = parser.parse_args()
454    _setup_logging(args.verbose)
455    try:
456        _, project_id = google.auth.default()
457    except google.auth.exceptions.DefaultCredentialsError:
458        _gcloud_login_and_set_project()
459        _, project_id = google.auth.default()
460    logging.info('Current GCP project ID: %s', project_id)
461    api_key = _retrieve_api_key(project_id)
462    if api_key is None:
463        logging.error(
464            'No API key with name [%s] found for project [%s]. Contact the '
465            'project owner to create the required key.',
466            _API_KEY_DISPLAY_NAME, project_id
467        )
468        return
469    gcs_bucket = project_id if args.gcs_bucket is None else args.gcs_bucket
470    gcs_base_dir = pathlib.PurePath(
471        datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
472        if not args.gcs_dir
473        else args.gcs_dir
474    )
475    mobly_dir = pathlib.Path(args.mobly_dir).absolute().expanduser()
476
477    if args.no_convert_result:
478        # Determine the final status based on the test.xml
479        test_xml = ElementTree.parse(mobly_dir.joinpath(_TEST_XML))
480        test_result_info = _get_test_result_info_from_test_xml(test_xml)
481        # Upload the contents of mobly_dir directly
482        gcs_files = _upload_dir_to_gcs(
483            mobly_dir, gcs_bucket, gcs_base_dir.as_posix(),
484            args.gcs_upload_timeout
485        )
486    else:
487        # Generate and upload test.xml and test.log
488        with tempfile.TemporaryDirectory() as tmp:
489            converted_dir = pathlib.Path(tmp).joinpath(gcs_base_dir)
490            converted_dir.mkdir(parents=True)
491            test_result_info = _convert_results(mobly_dir, converted_dir)
492            gcs_files = _upload_dir_to_gcs(
493                converted_dir, gcs_bucket, gcs_base_dir.as_posix(),
494                args.gcs_upload_timeout
495            )
496        # Upload raw Mobly logs to undeclared_outputs/ subdirectory
497        gcs_files += _upload_dir_to_gcs(
498            mobly_dir, gcs_bucket,
499            gcs_base_dir.joinpath(_UNDECLARED_OUTPUTS).as_posix(),
500            args.gcs_upload_timeout
501        )
502    _upload_to_resultstore(
503        api_key,
504        gcs_bucket,
505        gcs_base_dir.as_posix(),
506        gcs_files,
507        test_result_info.status,
508        args.test_title or test_result_info.target_id,
509        args.label
510    )
511
512
513if __name__ == '__main__':
514    main()
515