# Copyright 2015 The Chromium Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import copy import logging import os import pickle import re import six from devil.android import apk_helper from pylib import constants from pylib.base import base_test_result from pylib.base import test_exception from pylib.base import test_instance from pylib.constants import host_paths from pylib.instrumentation import test_result from pylib.instrumentation import instrumentation_parser from pylib.symbols import deobfuscator from pylib.symbols import stack_symbolizer from pylib.utils import dexdump from pylib.utils import gold_utils from pylib.utils import shared_preference_utils from pylib.utils import test_filter with host_paths.SysPath(host_paths.BUILD_COMMON_PATH): import unittest_util # pylint: disable=import-error # Ref: http://developer.android.com/reference/android/app/Activity.html _ACTIVITY_RESULT_CANCELED = 0 _ACTIVITY_RESULT_OK = -1 _COMMAND_LINE_PARAMETER = 'cmdlinearg-parameter' _DEFAULT_ANNOTATIONS = [ 'SmallTest', 'MediumTest', 'LargeTest', 'EnormousTest', 'IntegrationTest'] # This annotation is for disabled tests that should not be run in Test Reviver. _DO_NOT_REVIVE_ANNOTATIONS = ['DoNotRevive', 'Manual'] _EXCLUDE_UNLESS_REQUESTED_ANNOTATIONS = [ 'DisabledTest', 'FlakyTest', 'Manual'] _VALID_ANNOTATIONS = set(_DEFAULT_ANNOTATIONS + _DO_NOT_REVIVE_ANNOTATIONS + _EXCLUDE_UNLESS_REQUESTED_ANNOTATIONS) _TEST_LIST_JUNIT4_RUNNERS = [ 'org.chromium.base.test.BaseChromiumAndroidJUnitRunner'] _SKIP_PARAMETERIZATION = 'SkipCommandLineParameterization' _PARAMETERIZED_COMMAND_LINE_FLAGS = 'ParameterizedCommandLineFlags' _PARAMETERIZED_COMMAND_LINE_FLAGS_SWITCHES = ( 'ParameterizedCommandLineFlags$Switches') _NATIVE_CRASH_RE = re.compile('(process|native) crash', re.IGNORECASE) _PICKLE_FORMAT_VERSION = 12 # The ID of the bundle value Instrumentation uses to report which test index the # results are for in a collection of tests. Note that this index is 1-based. _BUNDLE_CURRENT_ID = 'current' # The ID of the bundle value Instrumentation uses to report the test class. _BUNDLE_CLASS_ID = 'class' # The ID of the bundle value Instrumentation uses to report the test name. _BUNDLE_TEST_ID = 'test' # The ID of the bundle value Instrumentation uses to report if a test was # skipped. _BUNDLE_SKIPPED_ID = 'test_skipped' # The ID of the bundle value Instrumentation uses to report the crash stack, if # the test crashed. _BUNDLE_STACK_ID = 'stack' # The ID of the bundle value Chrome uses to report the test duration. _BUNDLE_DURATION_ID = 'duration_ms' class MissingSizeAnnotationError(test_exception.TestException): def __init__(self, class_name): super().__init__( class_name + ': Test method is missing required size annotation. Add one of: ' + ', '.join('@' + a for a in _VALID_ANNOTATIONS)) class CommandLineParameterizationException(test_exception.TestException): pass class TestListPickleException(test_exception.TestException): pass # TODO(jbudorick): Make these private class methods of # InstrumentationTestInstance once the instrumentation junit3_runner_class is # deprecated. def ParseAmInstrumentRawOutput(raw_output): """Parses the output of an |am instrument -r| call. Args: raw_output: the output of an |am instrument -r| call as a list of lines Returns: A 3-tuple containing: - the instrumentation code as an integer - the instrumentation result as a list of lines - the instrumentation statuses received as a list of 2-tuples containing: - the status code as an integer - the bundle dump as a dict mapping string keys to a list of strings, one for each line. """ parser = instrumentation_parser.InstrumentationParser(raw_output) statuses = list(parser.IterStatus()) code, bundle = parser.GetResult() return (code, bundle, statuses) def GenerateTestResults(result_code, result_bundle, statuses, duration_ms, device_abi, symbolizer): """Generate test results from |statuses|. Args: result_code: The overall status code as an integer. result_bundle: The summary bundle dump as a dict. statuses: A list of 2-tuples containing: - the status code as an integer - the bundle dump as a dict mapping string keys to string values Note that this is the same as the third item in the 3-tuple returned by |_ParseAmInstrumentRawOutput|. duration_ms: The duration of the test in milliseconds. device_abi: The device_abi, which is needed for symbolization. symbolizer: The symbolizer used to symbolize stack. Returns: A list containing an instance of InstrumentationTestResult for each test parsed. """ results = [] current_result = None cumulative_duration = 0 for status_code, bundle in statuses: # If the last test was a failure already, don't override that failure with # post-test failures that could be caused by the original failure. if (status_code == instrumentation_parser.STATUS_CODE_BATCH_FAILURE and current_result.GetType() != base_test_result.ResultType.FAIL): current_result.SetType(base_test_result.ResultType.FAIL) _MaybeSetLog(bundle, current_result, symbolizer, device_abi) continue if status_code == instrumentation_parser.STATUS_CODE_TEST_DURATION: # For the first result, duration will be set below to the difference # between the reported and actual durations to account for overhead like # starting instrumentation. if results: current_duration = int(bundle.get(_BUNDLE_DURATION_ID, duration_ms)) current_result.SetDuration(current_duration) cumulative_duration += current_duration continue test_class = bundle.get(_BUNDLE_CLASS_ID, '') test_method = bundle.get(_BUNDLE_TEST_ID, '') if test_class and test_method: test_name = '%s#%s' % (test_class, test_method) else: continue if status_code == instrumentation_parser.STATUS_CODE_START: if current_result: results.append(current_result) current_result = test_result.InstrumentationTestResult( test_name, base_test_result.ResultType.UNKNOWN, duration_ms) else: if status_code == instrumentation_parser.STATUS_CODE_OK: if bundle.get(_BUNDLE_SKIPPED_ID, '').lower() in ('true', '1', 'yes'): current_result.SetType(base_test_result.ResultType.SKIP) elif current_result.GetType() == base_test_result.ResultType.UNKNOWN: current_result.SetType(base_test_result.ResultType.PASS) elif status_code == instrumentation_parser.STATUS_CODE_SKIP: current_result.SetType(base_test_result.ResultType.SKIP) elif status_code == instrumentation_parser.STATUS_CODE_ASSUMPTION_FAILURE: current_result.SetType(base_test_result.ResultType.SKIP) else: if status_code not in (instrumentation_parser.STATUS_CODE_ERROR, instrumentation_parser.STATUS_CODE_FAILURE): logging.error('Unrecognized status code %d. Handling as an error.', status_code) current_result.SetType(base_test_result.ResultType.FAIL) _MaybeSetLog(bundle, current_result, symbolizer, device_abi) if current_result: if current_result.GetType() == base_test_result.ResultType.UNKNOWN: crashed = (result_code == _ACTIVITY_RESULT_CANCELED and any( _NATIVE_CRASH_RE.search(l) for l in six.itervalues(result_bundle))) if crashed: current_result.SetType(base_test_result.ResultType.CRASH) results.append(current_result) if results: logging.info('Adding cumulative overhead to test %s: %dms', results[0].GetName(), duration_ms - cumulative_duration) results[0].SetDuration(duration_ms - cumulative_duration) return results def _MaybeSetLog(bundle, current_result, symbolizer, device_abi): if _BUNDLE_STACK_ID in bundle: stack = bundle[_BUNDLE_STACK_ID] if symbolizer and device_abi: current_result.SetLog('%s\n%s' % (stack, '\n'.join( symbolizer.ExtractAndResolveNativeStackTraces(stack, device_abi)))) else: current_result.SetLog(stack) current_result.SetFailureReason(_ParseExceptionMessage(stack)) def _ParseExceptionMessage(stack): """Extracts the exception message from the given stack trace. """ # This interprets stack traces reported via InstrumentationResultPrinter: # https://source.chromium.org/chromium/chromium/src/+/main:third_party/android_support_test_runner/runner/src/main/java/android/support/test/internal/runner/listener/InstrumentationResultPrinter.java;l=181?q=InstrumentationResultPrinter&type=cs # This is a standard Java stack trace, of the form: # # at SomeClass.SomeMethod(...) # at ... lines = stack.split('\n') for i, line in enumerate(lines): if line.startswith('\tat'): return '\n'.join(lines[0:i]) # No call stack found, so assume everything is the exception message. return stack def FilterTests(tests, filter_strs=None, annotations=None, excluded_annotations=None): """Filter a list of tests Args: tests: a list of tests. e.g. [ {'annotations": {}, 'class': 'com.example.TestA', 'method':'test1'}, {'annotations": {}, 'class': 'com.example.TestB', 'method':'test2'}] filter_strs: list of googletest-style filter string. annotations: a dict of wanted annotations for test methods. excluded_annotations: a dict of annotations to exclude. Return: A list of filtered tests """ def test_names_from_pattern(combined_pattern, test_names): patterns = combined_pattern.split(':') hashable_patterns = set() filename_patterns = [] for pattern in patterns: if ('*' in pattern or '?' in pattern or '[' in pattern): filename_patterns.append(pattern) else: hashable_patterns.add(pattern) filter_test_names = set( unittest_util.FilterTestNames(test_names, ':'.join( filename_patterns))) if len(filename_patterns) > 0 else set() for test_name in test_names: if test_name in hashable_patterns: filter_test_names.add(test_name) return filter_test_names def get_test_names(test): test_names = set() # Allow fully-qualified name as well as an omitted package. unqualified_class_test = { 'class': test['class'].split('.')[-1], 'method': test['method'] } test_name = GetTestName(test, sep='.') test_names.add(test_name) unqualified_class_test_name = GetTestName(unqualified_class_test, sep='.') test_names.add(unqualified_class_test_name) unique_test_name = GetUniqueTestName(test, sep='.') test_names.add(unique_test_name) if test['is_junit4']: junit4_test_name = GetTestNameWithoutParameterPostfix(test, sep='.') test_names.add(junit4_test_name) unqualified_junit4_test_name = \ GetTestNameWithoutParameterPostfix(unqualified_class_test, sep='.') test_names.add(unqualified_junit4_test_name) return test_names def get_tests_from_names(tests, test_names, tests_to_names): ''' Returns the tests for which the given names apply Args: tests: a list of tests. e.g. [ {'annotations": {}, 'class': 'com.example.TestA', 'method':'test1'}, {'annotations": {}, 'class': 'com.example.TestB', 'method':'test2'}] test_names: a collection of names determining tests to return. Return: A list of tests that match the given test names ''' filtered_tests = [] for t in tests: current_test_names = tests_to_names[id(t)] for current_test_name in current_test_names: if current_test_name in test_names: filtered_tests.append(t) break return filtered_tests def remove_tests_from_names(tests, remove_test_names, tests_to_names): ''' Returns the tests from the given list with given names removed Args: tests: a list of tests. e.g. [ {'annotations": {}, 'class': 'com.example.TestA', 'method':'test1'}, {'annotations": {}, 'class': 'com.example.TestB', 'method':'test2'}] remove_test_names: a collection of names determining tests to remove. tests_to_names: a dcitionary of test ids to a collection of applicable names for that test Return: A list of tests that don't match the given test names ''' filtered_tests = [] for t in tests: for name in tests_to_names[id(t)]: if name in remove_test_names: break else: filtered_tests.append(t) return filtered_tests def gtests_filter(tests, combined_filters): ''' Returns the tests after the combined_filters have been applied Args: tests: a list of tests. e.g. [ {'annotations": {}, 'class': 'com.example.TestA', 'method':'test1'}, {'annotations": {}, 'class': 'com.example.TestB', 'method':'test2'}] combined_filters: the filter string representing tests to exclude Return: A list of tests that should still be included after the combined_filters are applied to their names ''' if not combined_filters: return tests # Collect all test names all_test_names = set() tests_to_names = {} for t in tests: tests_to_names[id(t)] = get_test_names(t) for name in tests_to_names[id(t)]: all_test_names.add(name) for combined_filter in combined_filters: pattern_groups = combined_filter.split('-') negative_pattern = pattern_groups[1] if len(pattern_groups) > 1 else None positive_pattern = pattern_groups[0] if positive_pattern: # Only use the test names that match the positive pattern positive_test_names = test_names_from_pattern(positive_pattern, all_test_names) tests = get_tests_from_names(tests, positive_test_names, tests_to_names) if negative_pattern: # Remove any test the negative filter matches remove_names = test_names_from_pattern(negative_pattern, all_test_names) tests = remove_tests_from_names(tests, remove_names, tests_to_names) return tests def annotation_filter(all_annotations): if not annotations: return True return any_annotation_matches(annotations, all_annotations) def excluded_annotation_filter(all_annotations): if not excluded_annotations: return True return not any_annotation_matches(excluded_annotations, all_annotations) def any_annotation_matches(filter_annotations, all_annotations): return any( ak in all_annotations and annotation_value_matches(av, all_annotations[ak]) for ak, av in filter_annotations) def annotation_value_matches(filter_av, av): if filter_av is None: return True if isinstance(av, dict): tav_from_dict = av['value'] # If tav_from_dict is an int, the 'in' operator breaks, so convert # filter_av and manually compare. See https://crbug.com/1019707 if isinstance(tav_from_dict, int): return int(filter_av) == tav_from_dict return filter_av in tav_from_dict if isinstance(av, list): return filter_av in av return filter_av == av return_tests = [] for t in gtests_filter(tests, filter_strs): # Enforce that all tests declare their size. if not any(a in _VALID_ANNOTATIONS for a in t['annotations']): raise MissingSizeAnnotationError(GetTestName(t)) if (not annotation_filter(t['annotations']) or not excluded_annotation_filter(t['annotations'])): continue return_tests.append(t) return return_tests def GetAllTestsFromApk(test_apk): pickle_path = '%s-dexdump.pickle' % test_apk try: tests = GetTestsFromPickle(pickle_path, os.path.getmtime(test_apk)) except TestListPickleException as e: logging.info('Could not get tests from pickle: %s', e) logging.info('Getting tests from dex via dexdump.') tests = _GetTestsFromDexdump(test_apk) SaveTestsToPickle(pickle_path, tests) return tests def GetTestsFromPickle(pickle_path, test_mtime): if not os.path.exists(pickle_path): raise TestListPickleException('%s does not exist.' % pickle_path) if os.path.getmtime(pickle_path) <= test_mtime: raise TestListPickleException('File is stale: %s' % pickle_path) with open(pickle_path, 'rb') as f: pickle_data = pickle.load(f) if pickle_data['VERSION'] != _PICKLE_FORMAT_VERSION: raise TestListPickleException('PICKLE_FORMAT_VERSION has changed.') return pickle_data['TEST_METHODS'] def _GetTestsFromDexdump(test_apk): dex_dumps = dexdump.Dump(test_apk) tests = [] def get_test_methods(methods, annotations): test_methods = [] for method in methods: if method.startswith('test'): method_annotations = annotations.get(method, {}) # Dexdump used to not return any annotation info # So MediumTest annotation was added to all methods # Preserving this behaviour by adding MediumTest if none of the # size annotations are included in these annotations if not any(valid in method_annotations for valid in _VALID_ANNOTATIONS): method_annotations.update({'MediumTest': None}) test_methods.append({ 'method': method, 'annotations': method_annotations }) return test_methods for dump in dex_dumps: for package_name, package_info in six.iteritems(dump): for class_name, class_info in six.iteritems(package_info['classes']): if class_name.endswith('Test') and not class_info['is_abstract']: classAnnotations, methodsAnnotations = class_info['annotations'] tests.append({ 'class': '%s.%s' % (package_name, class_name), 'annotations': classAnnotations, 'methods': get_test_methods(class_info['methods'], methodsAnnotations), 'superclass': class_info['superclass'], }) return tests def SaveTestsToPickle(pickle_path, tests): pickle_data = { 'VERSION': _PICKLE_FORMAT_VERSION, 'TEST_METHODS': tests, } with open(pickle_path, 'wb') as pickle_file: pickle.dump(pickle_data, pickle_file) class MissingJUnit4RunnerException(test_exception.TestException): """Raised when JUnit4 runner is not provided or specified in apk manifest""" def __init__(self): super().__init__( 'JUnit4 runner is not provided or specified in test apk manifest.') def GetTestName(test, sep='#'): """Gets the name of the given test. Note that this may return the same name for more than one test, e.g. if a test is being run multiple times with different parameters. Args: test: the instrumentation test dict. sep: the character(s) that should join the class name and the method name. Returns: The test name as a string. """ test_name = '%s%s%s' % (test['class'], sep, test['method']) assert ' *-:' not in test_name, ( 'The test name must not contain any of the characters in " *-:". See ' 'https://crbug.com/912199') return test_name def GetTestNameWithoutParameterPostfix( test, sep='#', parameterization_sep='__'): """Gets the name of the given JUnit4 test without parameter postfix. For most WebView JUnit4 javatests, each test is parameterizatized with "__sandboxed_mode" to run in both non-sandboxed mode and sandboxed mode. This function returns the name of the test without parameterization so test filters can match both parameterized and non-parameterized tests. Args: test: the instrumentation test dict. sep: the character(s) that should join the class name and the method name. parameterization_sep: the character(s) that seperate method name and method parameterization postfix. Returns: The test name without parameter postfix as a string. """ name = GetTestName(test, sep=sep) return name.split(parameterization_sep)[0] def GetUniqueTestName(test, sep='#'): """Gets the unique name of the given test. This will include text to disambiguate between tests for which GetTestName would return the same name. Args: test: the instrumentation test dict. sep: the character(s) that should join the class name and the method name. Returns: The unique test name as a string. """ display_name = GetTestName(test, sep=sep) if test.get('flags', [None])[0]: sanitized_flags = [x.replace('-', '_') for x in test['flags']] display_name = '%s_with_%s' % (display_name, '_'.join(sanitized_flags)) assert ' *-:' not in display_name, ( 'The test name must not contain any of the characters in " *-:". See ' 'https://crbug.com/912199') return display_name class InstrumentationTestInstance(test_instance.TestInstance): def __init__(self, args, data_deps_delegate, error_func): super().__init__() self._additional_apks = [] self._additional_apexs = [] self._forced_queryable_additional_apks = [] self._instant_additional_apks = [] self._apk_under_test = None self._apk_under_test_incremental_install_json = None self._modules = None self._fake_modules = None self._additional_locales = None self._package_info = None self._suite = None self._test_apk = None self._test_apk_as_instant = False self._test_apk_incremental_install_json = None self._test_package = None self._junit3_runner_class = None self._junit4_runner_class = None self._junit4_runner_supports_listing = None self._test_support_apk = None self._initializeApkAttributes(args, error_func) self._data_deps = None self._data_deps_delegate = None self._runtime_deps_path = None self._store_data_in_app_directory = False self._initializeDataDependencyAttributes(args, data_deps_delegate) self._annotations = None self._excluded_annotations = None self._test_filters = None self._initializeTestFilterAttributes(args) self._run_setup_commands = [] self._run_teardown_commands = [] self._initializeSetupTeardownCommandAttributes(args) self._flags = None self._use_apk_under_test_flags_file = False self._initializeFlagAttributes(args) self._screenshot_dir = None self._timeout_scale = None self._wait_for_java_debugger = None self._initializeTestControlAttributes(args) self._coverage_directory = None self._initializeTestCoverageAttributes(args) self._store_tombstones = False self._symbolizer = None self._enable_breakpad_dump = False self._proguard_mapping_path = None self._deobfuscator = None self._initializeLogAttributes(args) self._edit_shared_prefs = [] self._initializeEditPrefsAttributes(args) self._replace_system_package = None self._initializeReplaceSystemPackageAttributes(args) self._system_packages_to_remove = None self._initializeSystemPackagesToRemoveAttributes(args) self._use_voice_interaction_service = None self._initializeUseVoiceInteractionService(args) self._use_webview_provider = None self._initializeUseWebviewProviderAttributes(args) self._skia_gold_properties = None self._initializeSkiaGoldAttributes(args) self._test_launcher_batch_limit = None self._initializeTestLauncherAttributes(args) self._approve_app_links_domain = None self._approve_app_links_package = None self._initializeApproveAppLinksAttributes(args) self._wpr_enable_record = args.wpr_enable_record self._external_shard_index = args.test_launcher_shard_index self._total_external_shards = args.test_launcher_total_shards self._is_unit_test = False self._initializeUnitTestFlag(args) def _initializeApkAttributes(self, args, error_func): if args.apk_under_test: apk_under_test_path = args.apk_under_test if (not args.apk_under_test.endswith('.apk') and not args.apk_under_test.endswith('.apks')): apk_under_test_path = os.path.join( constants.GetOutDirectory(), constants.SDK_BUILD_APKS_DIR, '%s.apk' % args.apk_under_test) # TODO(jbudorick): Move the realpath up to the argument parser once # APK-by-name is no longer supported. apk_under_test_path = os.path.realpath(apk_under_test_path) if not os.path.exists(apk_under_test_path): error_func('Unable to find APK under test: %s' % apk_under_test_path) self._apk_under_test = apk_helper.ToHelper(apk_under_test_path) test_apk_path = args.test_apk if not os.path.exists(test_apk_path): test_apk_path = os.path.join( constants.GetOutDirectory(), constants.SDK_BUILD_APKS_DIR, '%s.apk' % args.test_apk) # TODO(jbudorick): Move the realpath up to the argument parser once # APK-by-name is no longer supported. test_apk_path = os.path.realpath(test_apk_path) if not os.path.exists(test_apk_path): error_func('Unable to find test APK: %s' % test_apk_path) self._test_apk = apk_helper.ToHelper(test_apk_path) self._suite = os.path.splitext(os.path.basename(args.test_apk))[0] self._test_apk_as_instant = args.test_apk_as_instant self._apk_under_test_incremental_install_json = ( args.apk_under_test_incremental_install_json) self._test_apk_incremental_install_json = ( args.test_apk_incremental_install_json) if self._test_apk_incremental_install_json: assert self._suite.endswith('_incremental') self._suite = self._suite[:-len('_incremental')] self._modules = args.modules self._fake_modules = args.fake_modules self._additional_locales = args.additional_locales self._test_support_apk = apk_helper.ToHelper(os.path.join( constants.GetOutDirectory(), constants.SDK_BUILD_TEST_JAVALIB_DIR, '%sSupport.apk' % self._suite)) self._test_package = self._test_apk.GetPackageName() all_instrumentations = self._test_apk.GetAllInstrumentations() all_junit3_runner_classes = [ x for x in all_instrumentations if ('0xffffffff' in x.get( 'chromium-junit3', ''))] all_junit4_runner_classes = [ x for x in all_instrumentations if ('0xffffffff' not in x.get( 'chromium-junit3', ''))] if len(all_junit3_runner_classes) > 1: logging.warning('This test apk has more than one JUnit3 instrumentation') if len(all_junit4_runner_classes) > 1: logging.warning('This test apk has more than one JUnit4 instrumentation') self._junit3_runner_class = ( all_junit3_runner_classes[0]['android:name'] if all_junit3_runner_classes else self.test_apk.GetInstrumentationName()) self._junit4_runner_class = ( all_junit4_runner_classes[0]['android:name'] if all_junit4_runner_classes else None) if self._junit4_runner_class: if self._test_apk_incremental_install_json: self._junit4_runner_supports_listing = next( (True for x in self._test_apk.GetAllMetadata() if 'real-instr' in x[0] and x[1] in _TEST_LIST_JUNIT4_RUNNERS), False) else: self._junit4_runner_supports_listing = ( self._junit4_runner_class in _TEST_LIST_JUNIT4_RUNNERS) self._package_info = None if self._apk_under_test: package_under_test = self._apk_under_test.GetPackageName() for package_info in six.itervalues(constants.PACKAGE_INFO): if package_under_test == package_info.package: self._package_info = package_info break if not self._package_info: logging.warning( 'Unable to find package info for %s. ' '(This may just mean that the test package is ' 'currently being installed.)', self._test_package) for x in set(args.additional_apks + args.forced_queryable_additional_apks + args.instant_additional_apks): if not os.path.exists(x): error_func('Unable to find additional APK: %s' % x) apk = apk_helper.ToHelper(x) self._additional_apks.append(apk) if x in args.forced_queryable_additional_apks: self._forced_queryable_additional_apks.append(apk) if x in args.instant_additional_apks: self._instant_additional_apks.append(apk) self._additional_apexs = args.additional_apexs def _initializeDataDependencyAttributes(self, args, data_deps_delegate): self._data_deps = [] self._data_deps_delegate = data_deps_delegate self._runtime_deps_path = args.runtime_deps_path self._store_data_in_app_directory = args.store_data_in_app_directory if not self._runtime_deps_path: logging.warning('No data dependencies will be pushed.') def _initializeTestFilterAttributes(self, args): self._test_filters = test_filter.InitializeFiltersFromArgs(args) def annotation_element(a): a = a.split('=', 1) return (a[0], a[1] if len(a) == 2 else None) if args.annotation_str: self._annotations = [ annotation_element(a) for a in args.annotation_str.split(',')] elif not self._test_filters: self._annotations = [ annotation_element(a) for a in _DEFAULT_ANNOTATIONS] else: self._annotations = [] if args.exclude_annotation_str: self._excluded_annotations = [ annotation_element(a) for a in args.exclude_annotation_str.split(',')] else: self._excluded_annotations = [] requested_annotations = set(a[0] for a in self._annotations) if args.run_disabled: self._excluded_annotations.extend( annotation_element(a) for a in _DO_NOT_REVIVE_ANNOTATIONS if a not in requested_annotations) else: self._excluded_annotations.extend( annotation_element(a) for a in _EXCLUDE_UNLESS_REQUESTED_ANNOTATIONS if a not in requested_annotations) def _initializeSetupTeardownCommandAttributes(self, args): self._run_setup_commands = args.run_setup_commands self._run_teardown_commands = args.run_teardown_commands def _initializeFlagAttributes(self, args): self._use_apk_under_test_flags_file = args.use_apk_under_test_flags_file self._flags = ['--enable-test-intents'] if args.command_line_flags: self._flags.extend(args.command_line_flags) if args.device_flags_file: with open(args.device_flags_file) as device_flags_file: stripped_lines = (l.strip() for l in device_flags_file) self._flags.extend(flag for flag in stripped_lines if flag) if args.strict_mode and args.strict_mode != 'off' and ( # TODO(yliuyliu): Turn on strict mode for coverage once # crbug/1006397 is fixed. not args.coverage_dir): self._flags.append('--strict-mode=' + args.strict_mode) def _initializeTestControlAttributes(self, args): self._screenshot_dir = args.screenshot_dir self._timeout_scale = args.timeout_scale or 1 self._wait_for_java_debugger = args.wait_for_java_debugger def _initializeTestCoverageAttributes(self, args): self._coverage_directory = args.coverage_dir def _initializeLogAttributes(self, args): self._enable_breakpad_dump = args.enable_breakpad_dump self._proguard_mapping_path = args.proguard_mapping_path self._store_tombstones = args.store_tombstones self._symbolizer = stack_symbolizer.Symbolizer( self.apk_under_test.path if self.apk_under_test else None) def _initializeEditPrefsAttributes(self, args): if not hasattr(args, 'shared_prefs_file') or not args.shared_prefs_file: return if not isinstance(args.shared_prefs_file, str): logging.warning("Given non-string for a filepath") return self._edit_shared_prefs = shared_preference_utils.ExtractSettingsFromJson( args.shared_prefs_file) def _initializeReplaceSystemPackageAttributes(self, args): if (not hasattr(args, 'replace_system_package') or not args.replace_system_package): return self._replace_system_package = args.replace_system_package def _initializeSystemPackagesToRemoveAttributes(self, args): if (not hasattr(args, 'system_packages_to_remove') or not args.system_packages_to_remove): return self._system_packages_to_remove = args.system_packages_to_remove def _initializeUseVoiceInteractionService(self, args): if (not hasattr(args, 'use_voice_interaction_service') or not args.use_voice_interaction_service): return self._use_voice_interaction_service = args.use_voice_interaction_service def _initializeUseWebviewProviderAttributes(self, args): if (not hasattr(args, 'use_webview_provider') or not args.use_webview_provider): return self._use_webview_provider = args.use_webview_provider def _initializeSkiaGoldAttributes(self, args): self._skia_gold_properties = gold_utils.AndroidSkiaGoldProperties(args) def _initializeTestLauncherAttributes(self, args): if hasattr(args, 'test_launcher_batch_limit'): self._test_launcher_batch_limit = args.test_launcher_batch_limit def _initializeApproveAppLinksAttributes(self, args): if (not hasattr(args, 'approve_app_links') or not args.approve_app_links): return # The argument will be formatted as com.android.thing:www.example.com . app_links = args.approve_app_links.split(':') if (len(app_links) != 2 or not app_links[0] or not app_links[1]): logging.warning('--approve_app_links option provided, but malformed.') return self._approve_app_links_package = app_links[0] self._approve_app_links_domain = app_links[1] def _initializeUnitTestFlag(self, args): self._is_unit_test = args.is_unit_test @property def additional_apks(self): return self._additional_apks @property def additional_apexs(self): return self._additional_apexs @property def apk_under_test(self): return self._apk_under_test @property def apk_under_test_incremental_install_json(self): return self._apk_under_test_incremental_install_json @property def approve_app_links_package(self): return self._approve_app_links_package @property def approve_app_links_domain(self): return self._approve_app_links_domain @property def modules(self): return self._modules @property def fake_modules(self): return self._fake_modules @property def additional_locales(self): return self._additional_locales @property def coverage_directory(self): return self._coverage_directory @property def edit_shared_prefs(self): return self._edit_shared_prefs @property def enable_breakpad_dump(self): return self._enable_breakpad_dump @property def external_shard_index(self): return self._external_shard_index @property def flags(self): return self._flags @property def is_unit_test(self): return self._is_unit_test @property def junit3_runner_class(self): return self._junit3_runner_class @property def junit4_runner_class(self): return self._junit4_runner_class @property def junit4_runner_supports_listing(self): return self._junit4_runner_supports_listing @property def package_info(self): return self._package_info @property def replace_system_package(self): return self._replace_system_package @property def run_setup_commands(self): return self._run_setup_commands @property def run_teardown_commands(self): return self._run_teardown_commands @property def use_voice_interaction_service(self): return self._use_voice_interaction_service @property def use_webview_provider(self): return self._use_webview_provider @property def screenshot_dir(self): return self._screenshot_dir @property def skia_gold_properties(self): return self._skia_gold_properties @property def store_data_in_app_directory(self): return self._store_data_in_app_directory @property def store_tombstones(self): return self._store_tombstones @property def suite(self): return self._suite @property def symbolizer(self): return self._symbolizer @property def system_packages_to_remove(self): return self._system_packages_to_remove @property def test_apk(self): return self._test_apk @property def test_apk_as_instant(self): return self._test_apk_as_instant @property def test_apk_incremental_install_json(self): return self._test_apk_incremental_install_json @property def test_filters(self): return self._test_filters @property def test_launcher_batch_limit(self): return self._test_launcher_batch_limit @property def test_support_apk(self): return self._test_support_apk @property def test_package(self): return self._test_package @property def timeout_scale(self): return self._timeout_scale @property def total_external_shards(self): return self._total_external_shards @property def use_apk_under_test_flags_file(self): return self._use_apk_under_test_flags_file @property def wait_for_java_debugger(self): return self._wait_for_java_debugger @property def wpr_record_mode(self): return self._wpr_enable_record @property def wpr_replay_mode(self): return not self._wpr_enable_record #override def TestType(self): return 'instrumentation' #override def GetPreferredAbis(self): # We could alternatively take the intersection of what they all support, # but it should never be the case that they support different things. apks = [self._test_apk, self._apk_under_test] + self._additional_apks for apk in apks: if apk: ret = apk.GetAbis() if ret: return ret return [] #override def SetUp(self): self._data_deps.extend( self._data_deps_delegate(self._runtime_deps_path)) if self._proguard_mapping_path: self._deobfuscator = deobfuscator.DeobfuscatorPool( self._proguard_mapping_path) def GetDataDependencies(self): return self._data_deps def GetTests(self): if self._test_apk_incremental_install_json: # Would likely just be a matter of calling GetAllTestsFromApk on all # .dex files listed in the .json. raise Exception('Support not implemented for incremental_install=true on ' 'tests that do not use //base\'s test runner.') raw_tests = GetAllTestsFromApk(self.test_apk.path) return self.ProcessRawTests(raw_tests) def MaybeDeobfuscateLines(self, lines): if not self._deobfuscator: return lines return self._deobfuscator.TransformLines(lines) def ProcessRawTests(self, raw_tests): inflated_tests = self._ParameterizeTestsWithFlags( self._InflateTests(raw_tests)) if self._junit4_runner_class is None and any( t['is_junit4'] for t in inflated_tests): raise MissingJUnit4RunnerException() filtered_tests = FilterTests(inflated_tests, self._test_filters, self._annotations, self._excluded_annotations) if self._test_filters and not filtered_tests: for t in inflated_tests: logging.debug(' %s', GetUniqueTestName(t)) logging.warning('Unmatched Filters: %s', self._test_filters) return filtered_tests def IsApkForceQueryable(self, apk): return apk in self._forced_queryable_additional_apks def IsApkInstant(self, apk): return apk in self._instant_additional_apks # pylint: disable=no-self-use def _InflateTests(self, tests): inflated_tests = [] for c in tests: for m in c['methods']: a = dict(c['annotations']) a.update(m['annotations']) inflated_tests.append({ 'class': c['class'], 'method': m['method'], 'annotations': a, # TODO(https://crbug.com/1084729): Remove is_junit4. 'is_junit4': True }) return inflated_tests def _ParameterizeTestsWithFlags(self, tests): def _checkParameterization(annotations): types = [ _PARAMETERIZED_COMMAND_LINE_FLAGS_SWITCHES, _PARAMETERIZED_COMMAND_LINE_FLAGS, ] if types[0] in annotations and types[1] in annotations: raise CommandLineParameterizationException( 'Multiple command-line parameterization types: {}.'.format( ', '.join(types))) def _switchesToFlags(switches): return ['--{}'.format(s) for s in switches if s] def _annotationToSwitches(clazz, methods): if clazz == _PARAMETERIZED_COMMAND_LINE_FLAGS_SWITCHES: return [methods['value']] if clazz == _PARAMETERIZED_COMMAND_LINE_FLAGS: list_of_switches = [] for annotation in methods['value']: for c, m in six.iteritems(annotation): list_of_switches += _annotationToSwitches(c, m) return list_of_switches return [] def _setTestFlags(test, flags): if flags: test['flags'] = flags elif 'flags' in test: del test['flags'] new_tests = [] for t in tests: annotations = t['annotations'] list_of_switches = [] _checkParameterization(annotations) if _SKIP_PARAMETERIZATION not in annotations: for clazz, methods in six.iteritems(annotations): list_of_switches += _annotationToSwitches(clazz, methods) if list_of_switches: _setTestFlags(t, _switchesToFlags(list_of_switches[0])) for p in list_of_switches[1:]: parameterized_t = copy.copy(t) _setTestFlags(parameterized_t, _switchesToFlags(p)) new_tests.append(parameterized_t) return tests + new_tests @staticmethod def ParseAmInstrumentRawOutput(raw_output): return ParseAmInstrumentRawOutput(raw_output) @staticmethod def GenerateTestResults(result_code, result_bundle, statuses, duration_ms, device_abi, symbolizer): return GenerateTestResults(result_code, result_bundle, statuses, duration_ms, device_abi, symbolizer) #override def TearDown(self): self.symbolizer.CleanUp() if self._deobfuscator: self._deobfuscator.Close() self._deobfuscator = None