1# Copyright (c) 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Helper class for instrumenation test jar.""" 6# pylint: disable=W0702 7 8import logging 9import os 10import pickle 11import re 12import sys 13import tempfile 14 15from pylib import cmd_helper 16from pylib import constants 17from pylib.device import device_utils 18 19sys.path.insert(0, 20 os.path.join(constants.DIR_SOURCE_ROOT, 21 'build', 'util', 'lib', 'common')) 22 23import unittest_util # pylint: disable=F0401 24 25# If you change the cached output of proguard, increment this number 26PICKLE_FORMAT_VERSION = 2 27 28 29class TestJar(object): 30 _ANNOTATIONS = frozenset( 31 ['Smoke', 'SmallTest', 'MediumTest', 'LargeTest', 'EnormousTest', 32 'FlakyTest', 'DisabledTest', 'Manual', 'PerfTest', 'HostDrivenTest', 33 'IntegrationTest']) 34 _DEFAULT_ANNOTATION = 'SmallTest' 35 _PROGUARD_CLASS_RE = re.compile(r'\s*?- Program class:\s*([\S]+)$') 36 _PROGUARD_SUPERCLASS_RE = re.compile(r'\s*? Superclass:\s*([\S]+)$') 37 _PROGUARD_METHOD_RE = re.compile(r'\s*?- Method:\s*(\S*)[(].*$') 38 _PROGUARD_ANNOTATION_RE = re.compile(r'\s*?- Annotation \[L(\S*);\]:$') 39 _PROGUARD_ANNOTATION_CONST_RE = ( 40 re.compile(r'\s*?- Constant element value.*$')) 41 _PROGUARD_ANNOTATION_VALUE_RE = re.compile(r'\s*?- \S+? \[(.*)\]$') 42 43 def __init__(self, jar_path): 44 if not os.path.exists(jar_path): 45 raise Exception('%s not found, please build it' % jar_path) 46 47 self._PROGUARD_PATH = os.path.join(constants.ANDROID_SDK_ROOT, 48 'tools/proguard/lib/proguard.jar') 49 if not os.path.exists(self._PROGUARD_PATH): 50 self._PROGUARD_PATH = os.path.join(os.environ['ANDROID_BUILD_TOP'], 51 'external/proguard/lib/proguard.jar') 52 self._jar_path = jar_path 53 self._pickled_proguard_name = self._jar_path + '-proguard.pickle' 54 self._test_methods = {} 55 if not self._GetCachedProguardData(): 56 self._GetProguardData() 57 58 def _GetCachedProguardData(self): 59 if (os.path.exists(self._pickled_proguard_name) and 60 (os.path.getmtime(self._pickled_proguard_name) > 61 os.path.getmtime(self._jar_path))): 62 logging.info('Loading cached proguard output from %s', 63 self._pickled_proguard_name) 64 try: 65 with open(self._pickled_proguard_name, 'r') as r: 66 d = pickle.loads(r.read()) 67 if d['VERSION'] == PICKLE_FORMAT_VERSION: 68 self._test_methods = d['TEST_METHODS'] 69 return True 70 except: 71 logging.warning('PICKLE_FORMAT_VERSION has changed, ignoring cache') 72 return False 73 74 def _GetProguardData(self): 75 logging.info('Retrieving test methods via proguard.') 76 77 with tempfile.NamedTemporaryFile() as proguard_output: 78 cmd_helper.RunCmd(['java', '-jar', 79 self._PROGUARD_PATH, 80 '-injars', self._jar_path, 81 '-dontshrink', 82 '-dontoptimize', 83 '-dontobfuscate', 84 '-dontpreverify', 85 '-dump', proguard_output.name]) 86 87 clazzez = {} 88 89 annotation = None 90 annotation_has_value = False 91 clazz = None 92 method = None 93 94 for line in proguard_output: 95 if len(line) == 0: 96 annotation = None 97 annotation_has_value = False 98 method = None 99 continue 100 101 m = self._PROGUARD_CLASS_RE.match(line) 102 if m: 103 clazz = m.group(1).replace('/', '.') 104 clazzez[clazz] = { 105 'methods': {}, 106 'annotations': {} 107 } 108 annotation = None 109 annotation_has_value = False 110 method = None 111 continue 112 113 if not clazz: 114 continue 115 116 m = self._PROGUARD_SUPERCLASS_RE.match(line) 117 if m: 118 clazzez[clazz]['superclass'] = m.group(1).replace('/', '.') 119 continue 120 121 if clazz.endswith('Test'): 122 m = self._PROGUARD_METHOD_RE.match(line) 123 if m: 124 method = m.group(1) 125 clazzez[clazz]['methods'][method] = {'annotations': {}} 126 annotation = None 127 annotation_has_value = False 128 continue 129 130 m = self._PROGUARD_ANNOTATION_RE.match(line) 131 if m: 132 # Ignore the annotation package. 133 annotation = m.group(1).split('/')[-1] 134 if method: 135 clazzez[clazz]['methods'][method]['annotations'][annotation] = None 136 else: 137 clazzez[clazz]['annotations'][annotation] = None 138 continue 139 140 if annotation: 141 if not annotation_has_value: 142 m = self._PROGUARD_ANNOTATION_CONST_RE.match(line) 143 annotation_has_value = bool(m) 144 else: 145 m = self._PROGUARD_ANNOTATION_VALUE_RE.match(line) 146 if m: 147 if method: 148 clazzez[clazz]['methods'][method]['annotations'][annotation] = ( 149 m.group(1)) 150 else: 151 clazzez[clazz]['annotations'][annotation] = m.group(1) 152 annotation_has_value = None 153 154 test_clazzez = ((n, i) for n, i in clazzez.items() if n.endswith('Test')) 155 for clazz_name, clazz_info in test_clazzez: 156 logging.info('Processing %s' % clazz_name) 157 c = clazz_name 158 min_sdk_level = None 159 160 while c in clazzez: 161 c_info = clazzez[c] 162 if not min_sdk_level: 163 min_sdk_level = c_info['annotations'].get('MinAndroidSdkLevel', None) 164 c = c_info.get('superclass', None) 165 166 for method_name, method_info in clazz_info['methods'].items(): 167 if method_name.startswith('test'): 168 qualified_method = '%s#%s' % (clazz_name, method_name) 169 method_annotations = [] 170 for annotation_name, annotation_value in ( 171 method_info['annotations'].items()): 172 method_annotations.append(annotation_name) 173 if annotation_value: 174 method_annotations.append( 175 annotation_name + ':' + annotation_value) 176 self._test_methods[qualified_method] = { 177 'annotations': method_annotations 178 } 179 if min_sdk_level is not None: 180 self._test_methods[qualified_method]['min_sdk_level'] = ( 181 int(min_sdk_level)) 182 183 logging.info('Storing proguard output to %s', self._pickled_proguard_name) 184 d = {'VERSION': PICKLE_FORMAT_VERSION, 185 'TEST_METHODS': self._test_methods} 186 with open(self._pickled_proguard_name, 'w') as f: 187 f.write(pickle.dumps(d)) 188 189 190 @staticmethod 191 def _IsTestMethod(test): 192 class_name, method = test.split('#') 193 return class_name.endswith('Test') and method.startswith('test') 194 195 def GetTestAnnotations(self, test): 196 """Returns a list of all annotations for the given |test|. May be empty.""" 197 if not self._IsTestMethod(test) or not test in self._test_methods: 198 return [] 199 return self._test_methods[test]['annotations'] 200 201 @staticmethod 202 def _AnnotationsMatchFilters(annotation_filter_list, annotations): 203 """Checks if annotations match any of the filters.""" 204 if not annotation_filter_list: 205 return True 206 for annotation_filter in annotation_filter_list: 207 filters = annotation_filter.split('=') 208 if len(filters) == 2: 209 key = filters[0] 210 value_list = filters[1].split(',') 211 for value in value_list: 212 if key + ':' + value in annotations: 213 return True 214 elif annotation_filter in annotations: 215 return True 216 return False 217 218 def GetAnnotatedTests(self, annotation_filter_list): 219 """Returns a list of all tests that match the given annotation filters.""" 220 return [test for test, attr in self.GetTestMethods().iteritems() 221 if self._IsTestMethod(test) and self._AnnotationsMatchFilters( 222 annotation_filter_list, attr['annotations'])] 223 224 def GetTestMethods(self): 225 """Returns a dict of all test methods and relevant attributes. 226 227 Test methods are retrieved as Class#testMethod. 228 """ 229 return self._test_methods 230 231 def _GetTestsMissingAnnotation(self): 232 """Get a list of test methods with no known annotations.""" 233 tests_missing_annotations = [] 234 for test_method in self.GetTestMethods().iterkeys(): 235 annotations_ = frozenset(self.GetTestAnnotations(test_method)) 236 if (annotations_.isdisjoint(self._ANNOTATIONS) and 237 not self.IsHostDrivenTest(test_method)): 238 tests_missing_annotations.append(test_method) 239 return sorted(tests_missing_annotations) 240 241 def _IsTestValidForSdkRange(self, test_name, attached_min_sdk_level): 242 required_min_sdk_level = self.GetTestMethods()[test_name].get( 243 'min_sdk_level', None) 244 return (required_min_sdk_level is None or 245 attached_min_sdk_level >= required_min_sdk_level) 246 247 def GetAllMatchingTests(self, annotation_filter_list, 248 exclude_annotation_list, test_filter): 249 """Get a list of tests matching any of the annotations and the filter. 250 251 Args: 252 annotation_filter_list: List of test annotations. A test must have at 253 least one of these annotations. A test without any annotations is 254 considered to be SmallTest. 255 exclude_annotation_list: List of test annotations. A test must not have 256 any of these annotations. 257 test_filter: Filter used for partial matching on the test method names. 258 259 Returns: 260 List of all matching tests. 261 """ 262 if annotation_filter_list: 263 available_tests = self.GetAnnotatedTests(annotation_filter_list) 264 # Include un-annotated tests in SmallTest. 265 if annotation_filter_list.count(self._DEFAULT_ANNOTATION) > 0: 266 for test in self._GetTestsMissingAnnotation(): 267 logging.warning( 268 '%s has no annotations. Assuming "%s".', test, 269 self._DEFAULT_ANNOTATION) 270 available_tests.append(test) 271 if exclude_annotation_list: 272 excluded_tests = self.GetAnnotatedTests(exclude_annotation_list) 273 available_tests = list(set(available_tests) - set(excluded_tests)) 274 else: 275 available_tests = [m for m in self.GetTestMethods() 276 if not self.IsHostDrivenTest(m)] 277 278 tests = [] 279 if test_filter: 280 # |available_tests| are in adb instrument format: package.path.class#test. 281 282 # Maps a 'class.test' name to each 'package.path.class#test' name. 283 sanitized_test_names = dict([ 284 (t.split('.')[-1].replace('#', '.'), t) for t in available_tests]) 285 # Filters 'class.test' names and populates |tests| with the corresponding 286 # 'package.path.class#test' names. 287 tests = [ 288 sanitized_test_names[t] for t in unittest_util.FilterTestNames( 289 sanitized_test_names.keys(), test_filter.replace('#', '.'))] 290 else: 291 tests = available_tests 292 293 # Filter out any tests with SDK level requirements that don't match the set 294 # of attached devices. 295 sdk_versions = [ 296 int(v) for v in 297 device_utils.DeviceUtils.parallel().GetProp( 298 'ro.build.version.sdk').pGet(None)] 299 tests = filter( 300 lambda t: self._IsTestValidForSdkRange(t, min(sdk_versions)), 301 tests) 302 303 return tests 304 305 @staticmethod 306 def IsHostDrivenTest(test): 307 return 'pythonDrivenTests' in test 308