1# 2# Copyright (C) 2016 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15import io 16import logging 17import os 18import shutil 19import time 20import zipfile 21 22from google.protobuf.internal.containers import RepeatedCompositeFieldContainer 23 24from vts.proto import VtsReportMessage_pb2 as ReportMsg 25from vts.runners.host import keys 26from vts.utils.python.archive import archive_parser 27from vts.utils.python.build.api import artifact_fetcher 28from vts.utils.python.coverage import coverage_report 29from vts.utils.python.coverage import gcda_parser 30from vts.utils.python.coverage import gcno_parser 31from vts.utils.python.coverage.parser import FileFormatError 32from vts.utils.python.web import feature_utils 33 34TARGET_COVERAGE_PATH = "/data/misc/gcov/" 35LOCAL_COVERAGE_PATH = "/tmp/vts-test-coverage" 36 37GCNO_SUFFIX = ".gcno" 38GCDA_SUFFIX = ".gcda" 39COVERAGE_SUFFIX = ".gcnodir" 40GIT_PROJECT = "git_project" 41MODULE_NAME = "module_name" 42NAME = "name" 43PATH = "path" 44 45_BRANCH = "master" # TODO: make this a runtime parameter 46_CHECKSUM_GCNO_DICT = "checksum_gcno_dict" 47_COVERAGE_ZIP = "coverage_zip" 48_REVISION_DICT = "revision_dict" 49 50 51class CoverageFeature(feature_utils.Feature): 52 """Feature object for coverage functionality. 53 54 Attributes: 55 enabled: boolean, True if coverage is enabled, False otherwise 56 web: (optional) WebFeature, object storing web feature util for test run 57 local_coverage_path: path to store the coverage files. 58 """ 59 60 _TOGGLE_PARAM = keys.ConfigKeys.IKEY_ENABLE_COVERAGE 61 _REQUIRED_PARAMS = [ 62 keys.ConfigKeys.IKEY_ANDROID_DEVICE, 63 keys.ConfigKeys.IKEY_SERVICE_JSON_PATH, 64 keys.ConfigKeys.IKEY_ANDROID_DEVICE 65 ] 66 _OPTIONAL_PARAMS = [ 67 keys.ConfigKeys.IKEY_MODULES, 68 keys.ConfigKeys.IKEY_OUTPUT_COVERAGE_REPORT, 69 keys.ConfigKeys.IKEY_GLOBAL_COVERAGE 70 ] 71 72 def __init__(self, user_params, web=None, local_coverage_path=None): 73 """Initializes the coverage feature. 74 75 Args: 76 user_params: A dictionary from parameter name (String) to parameter value. 77 web: (optional) WebFeature, object storing web feature util for test run 78 local_coverage_path: (optional) path to store the .gcda files and coverage reports. 79 """ 80 self.ParseParameters(self._TOGGLE_PARAM, self._REQUIRED_PARAMS, 81 self._OPTIONAL_PARAMS, user_params) 82 self.web = web 83 if local_coverage_path: 84 self.local_coverage_path = local_coverage_path 85 else: 86 timestamp_seconds = str(int(time.time() * 1000000)) 87 self.local_coverage_path = os.path.join(LOCAL_COVERAGE_PATH, 88 timestamp_seconds) 89 if os.path.exists(self.local_coverage_path): 90 logging.info("removing existing coverage path: %s", 91 self.local_coverage_path) 92 shutil.rmtree(self.local_coverage_path) 93 os.makedirs(self.local_coverage_path) 94 95 self.global_coverage = getattr( 96 self, keys.ConfigKeys.IKEY_GLOBAL_COVERAGE, True) 97 logging.info("Coverage enabled: %s", self.enabled) 98 99 def _ExtractSourceName(self, gcno_summary, file_name): 100 """Gets the source name from the GCNO summary object. 101 102 Gets the original source file name from the FileSummary object describing 103 a gcno file using the base filename of the gcno/gcda file. 104 105 Args: 106 gcno_summary: a FileSummary object describing a gcno file 107 file_name: the base filename (without extensions) of the gcno or gcda file 108 109 Returns: 110 The relative path to the original source file corresponding to the 111 provided gcno summary. The path is relative to the root of the build. 112 """ 113 src_file_path = None 114 for key in gcno_summary.functions: 115 src_file_path = gcno_summary.functions[key].src_file_name 116 src_parts = src_file_path.rsplit(".", 1) 117 src_file_name = src_parts[0] 118 src_extension = src_parts[1] if len(src_parts) > 1 else None 119 if src_extension not in ["c", "cpp", "cc"]: 120 logging.warn("Found unsupported file type: %s", src_file_path) 121 continue 122 if src_file_name.endswith(file_name): 123 logging.info("Coverage source file: %s", src_file_path) 124 break 125 return src_file_path 126 127 def _GetChecksumGcnoDict(self, cov_zip): 128 """Generates a dictionary from gcno checksum to GCNOParser object. 129 130 Processes the gcnodir files in the zip file to produce a mapping from gcno 131 checksum to the GCNOParser object wrapping the gcno content. 132 133 Args: 134 cov_zip: the zip file containing gcnodir files from the device build 135 136 Returns: 137 the dictionary of gcno checksums to GCNOParser objects 138 """ 139 checksum_gcno_dict = dict() 140 fnames = cov_zip.namelist() 141 instrumented_modules = [ 142 f for f in fnames if f.endswith(COVERAGE_SUFFIX) 143 ] 144 for instrumented_module in instrumented_modules: 145 # Read the gcnodir file 146 archive = archive_parser.Archive( 147 cov_zip.open(instrumented_module).read()) 148 try: 149 archive.Parse() 150 except ValueError: 151 logging.error("Archive could not be parsed: %s", name) 152 continue 153 154 for gcno_file_path in archive.files: 155 file_name_path = gcno_file_path.rsplit(".", 1)[0] 156 file_name = os.path.basename(file_name_path) 157 gcno_stream = io.BytesIO(archive.files[gcno_file_path]) 158 gcno_file_parser = gcno_parser.GCNOParser(gcno_stream) 159 checksum_gcno_dict[ 160 gcno_file_parser.checksum] = gcno_file_parser 161 return checksum_gcno_dict 162 163 def InitializeDeviceCoverage(self, dut): 164 """Initializes the device for coverage before tests run. 165 166 Finds and removes all gcda files under TARGET_COVERAGE_PATH before tests 167 run. 168 169 Args: 170 dut: the device under test. 171 """ 172 logging.info("Removing existing gcda files.") 173 gcda_files = dut.adb.shell("find %s -name \"*.gcda\" -type f -delete" % 174 TARGET_COVERAGE_PATH) 175 176 def GetGcdaDict(self, dut): 177 """Retrieves GCDA files from device and creates a dictionary of files. 178 179 Find all GCDA files on the target device, copy them to the host using 180 adb, then return a dictionary mapping from the gcda basename to the 181 temp location on the host. 182 183 Args: 184 dut: the device under test. 185 local_coverage_path: the host path (string) in which to copy gcda files 186 187 Returns: 188 A dictionary with gcda basenames as keys and contents as the values. 189 """ 190 logging.info("Creating gcda dictionary") 191 gcda_dict = {} 192 logging.info("Storing gcda tmp files to: %s", self.local_coverage_path) 193 gcda_files = dut.adb.shell("find %s -name \"*.gcda\"" % 194 TARGET_COVERAGE_PATH).split("\n") 195 for gcda in gcda_files: 196 if gcda: 197 basename = os.path.basename(gcda.strip()) 198 file_name = os.path.join(self.local_coverage_path, basename) 199 dut.adb.pull("%s %s" % (gcda, file_name)) 200 gcda_content = open(file_name, "rb").read() 201 gcda_dict[basename] = gcda_content 202 return gcda_dict 203 204 def _OutputCoverageReport(self, isGlobal): 205 logging.info("outputing coverage data") 206 timestamp_seconds = str(int(time.time() * 1000000)) 207 coverage_report_file = os.path.join( 208 self.local_coverage_path, 209 "coverage_report_" + timestamp_seconds + ".txt") 210 logging.info("Storing coverage report to: %s", coverage_report_file) 211 coverage_report_msg = ReportMsg.TestReportMessage() 212 if isGlobal: 213 for c in self.web.report_msg.coverage: 214 coverage = coverage_report_msg.coverage.add() 215 coverage.CopyFrom(c) 216 else: 217 for c in self.web.current_test_report_msg.coverage: 218 coverage = coverage_report_msg.coverage.add() 219 coverage.CopyFrom(c) 220 with open(coverage_report_file, 'w+') as f: 221 f.write(str(coverage_report_msg)) 222 223 def _AutoProcess(self, gcda_dict, isGlobal): 224 """Process coverage data and appends coverage reports to the report message. 225 226 Matches gcno files with gcda files and processes them into a coverage report 227 with references to the original source code used to build the system image. 228 Coverage information is appended as a CoverageReportMessage to the provided 229 report message. 230 231 Git project information is automatically extracted from the build info and 232 the source file name enclosed in each gcno file. Git project names must 233 resemble paths and may differ from the paths to their project root by at 234 most one. If no match is found, then coverage information will not be 235 be processed. 236 237 e.g. if the project path is test/vts, then its project name may be 238 test/vts or <some folder>/test/vts in order to be recognized. 239 240 Args: 241 gcda_dict: the dictionary of gcda basenames to gcda content (binary string) 242 isGlobal: boolean, True if the coverage data is for the entire test, False if only for 243 the current test case. 244 """ 245 revision_dict = getattr(self, _REVISION_DICT, None) 246 checksum_gcno_dict = getattr(self, _CHECKSUM_GCNO_DICT, None) 247 output_coverage_report = getattr( 248 self, keys.ConfigKeys.IKEY_OUTPUT_COVERAGE_REPORT, False) 249 250 for gcda_name in gcda_dict: 251 gcda_stream = io.BytesIO(gcda_dict[gcda_name]) 252 gcda_file_parser = gcda_parser.GCDAParser(gcda_stream) 253 254 if not gcda_file_parser.checksum in checksum_gcno_dict: 255 logging.info("No matching gcno file for gcda: %s", gcda_name) 256 continue 257 gcno_file_parser = checksum_gcno_dict[gcda_file_parser.checksum] 258 259 try: 260 gcno_summary = gcno_file_parser.Parse() 261 except FileFormatError: 262 logging.error("Error parsing gcno for gcda %s", gcda_name) 263 continue 264 265 file_name = gcda_name.rsplit(".", 1)[0] 266 src_file_path = self._ExtractSourceName(gcno_summary, file_name) 267 268 if not src_file_path: 269 logging.error("No source file found for gcda %s.", gcda_name) 270 continue 271 272 # Process and merge gcno/gcda data 273 try: 274 gcda_file_parser.Parse(gcno_summary) 275 except FileFormatError: 276 logging.error("Error parsing gcda file %s", gcda_name) 277 continue 278 279 # Get the git project information 280 # Assumes that the project name and path to the project root are similar 281 revision = None 282 for project_name in revision_dict: 283 # Matches cases when source file root and project name are the same 284 if src_file_path.startswith(str(project_name)): 285 git_project_name = str(project_name) 286 git_project_path = str(project_name) 287 revision = str(revision_dict[project_name]) 288 logging.info("Source file '%s' matched with project '%s'", 289 src_file_path, git_project_name) 290 break 291 292 parts = os.path.normpath(str(project_name)).split(os.sep, 1) 293 # Matches when project name has an additional prefix before the 294 # project path root. 295 if len(parts) > 1 and src_file_path.startswith(parts[-1]): 296 git_project_name = str(project_name) 297 git_project_path = parts[-1] 298 revision = str(revision_dict[project_name]) 299 logging.info("Source file '%s' matched with project '%s'", 300 src_file_path, git_project_name) 301 302 if not revision: 303 logging.info("Could not find git info for %s", src_file_path) 304 continue 305 306 if self.web and self.web.enabled: 307 coverage_vec = coverage_report.GenerateLineCoverageVector( 308 src_file_path, gcno_summary) 309 total_count, covered_count = coverage_report.GetCoverageStats( 310 coverage_vec) 311 self.web.AddCoverageReport(coverage_vec, src_file_path, 312 git_project_name, git_project_path, 313 revision, covered_count, 314 total_count, isGlobal) 315 316 if output_coverage_report: 317 self._OutputCoverageReport(isGlobal) 318 319 def _ManualProcess(self, gcda_dict, isGlobal): 320 """Process coverage data and appends coverage reports to the report message. 321 322 Opens the gcno files in the cov_zip for the specified modules and matches 323 gcno/gcda files. Then, coverage vectors are generated for each set of matching 324 gcno/gcda files and appended as a CoverageReportMessage to the provided 325 report message. Unlike AutoProcess, coverage information is only processed 326 for the modules explicitly defined in 'modules'. 327 328 Args: 329 gcda_dict: the dictionary of gcda basenames to gcda content (binary string) 330 isGlobal: boolean, True if the coverage data is for the entire test, False if only for 331 the current test case. 332 """ 333 cov_zip = getattr(self, _COVERAGE_ZIP, None) 334 revision_dict = getattr(self, _REVISION_DICT, None) 335 output_coverage_report = getattr( 336 self, keys.ConfigKeys.IKEY_OUTPUT_COVERAGE_REPORT, True) 337 modules = getattr(self, keys.ConfigKeys.IKEY_MODULES, None) 338 covered_modules = set(cov_zip.namelist()) 339 for module in modules: 340 if MODULE_NAME not in module or GIT_PROJECT not in module: 341 logging.error( 342 "Coverage module must specify name and git project: %s", 343 module) 344 continue 345 project = module[GIT_PROJECT] 346 if PATH not in project or NAME not in project: 347 logging.error("Project name and path not specified: %s", 348 project) 349 continue 350 351 name = str(module[MODULE_NAME]) + COVERAGE_SUFFIX 352 git_project = str(project[NAME]) 353 git_project_path = str(project[PATH]) 354 355 if name not in covered_modules: 356 logging.error("No coverage information for module %s", name) 357 continue 358 if git_project not in revision_dict: 359 logging.error( 360 "Git project not present in device revision dict: %s", 361 git_project) 362 continue 363 364 revision = str(revision_dict[git_project]) 365 archive = archive_parser.Archive(cov_zip.open(name).read()) 366 try: 367 archive.Parse() 368 except ValueError: 369 logging.error("Archive could not be parsed: %s", name) 370 continue 371 372 for gcno_file_path in archive.files: 373 file_name_path = gcno_file_path.rsplit(".", 1)[0] 374 file_name = os.path.basename(file_name_path) 375 gcno_content = archive.files[gcno_file_path] 376 gcno_stream = io.BytesIO(gcno_content) 377 try: 378 gcno_summary = gcno_parser.GCNOParser(gcno_stream).Parse() 379 except FileFormatError: 380 logging.error("Error parsing gcno file %s", gcno_file_path) 381 continue 382 src_file_path = None 383 384 # Match gcno file with gcda file 385 gcda_name = file_name + GCDA_SUFFIX 386 if gcda_name not in gcda_dict: 387 logging.error("No gcda file found %s.", gcda_name) 388 continue 389 390 src_file_path = self._ExtractSourceName(gcno_summary, 391 file_name) 392 393 if not src_file_path: 394 logging.error("No source file found for %s.", 395 gcno_file_path) 396 continue 397 398 # Process and merge gcno/gcda data 399 gcda_content = gcda_dict[gcda_name] 400 gcda_stream = io.BytesIO(gcda_content) 401 try: 402 gcda_parser.GCDAParser(gcda_stream).Parse(gcno_summary) 403 except FileFormatError: 404 logging.error("Error parsing gcda file %s", gcda_content) 405 continue 406 407 if self.web and self.web.enabled: 408 coverage_vec = coverage_report.GenerateLineCoverageVector( 409 src_file_path, gcno_summary) 410 total_count, covered_count = coverage_report.GetCoverageStats( 411 coverage_vec) 412 self.web.AddCoverageReport(coverage_vec, src_file_path, 413 git_project, git_project_path, 414 revision, covered_count, 415 total_count, isGlobal) 416 417 if output_coverage_report: 418 self._OutputCoverageReport(isGlobal) 419 420 def LoadArtifacts(self): 421 """Initializes the test for coverage instrumentation. 422 423 Downloads build artifacts from the build server 424 (gcno zip file and git revision dictionary) and prepares for coverage 425 measurement. 426 427 Requires coverage feature enabled; no-op otherwise. 428 """ 429 if not self.enabled: 430 return 431 432 self.enabled = False 433 434 # Use first device info to get product, flavor, and ID 435 # TODO: support multi-device builds 436 android_devices = getattr(self, keys.ConfigKeys.IKEY_ANDROID_DEVICE) 437 if not isinstance(android_devices, list): 438 logging.warn("android device information not available") 439 return 440 441 device_spec = android_devices[0] 442 build_flavor = device_spec.get(keys.ConfigKeys.IKEY_BUILD_FLAVOR) 443 device_build_id = device_spec.get(keys.ConfigKeys.IKEY_BUILD_ID) 444 445 if not build_flavor or not device_build_id: 446 logging.error("Could not read device information.") 447 return 448 449 build_flavor = str(build_flavor) 450 if not "coverage" in build_flavor: 451 build_flavor = "{0}_coverage".format(build_flavor) 452 product = build_flavor.split("-", 1)[0] 453 build_id = str(device_build_id) 454 455 # Get service json path 456 service_json_path = getattr(self, 457 keys.ConfigKeys.IKEY_SERVICE_JSON_PATH) 458 459 # Instantiate build client 460 try: 461 build_client = artifact_fetcher.AndroidBuildClient( 462 service_json_path) 463 except Exception as e: 464 logging.exception('Failed to instantiate build client: %s', e) 465 return 466 467 # Fetch repo dictionary 468 try: 469 revision_dict = build_client.GetRepoDictionary( 470 _BRANCH, build_flavor, device_build_id) 471 setattr(self, _REVISION_DICT, revision_dict) 472 except Exception as e: 473 logging.exception('Failed to fetch repo dictionary: %s', e) 474 logging.info('Coverage disabled') 475 return 476 477 # Fetch coverage zip 478 try: 479 cov_zip = io.BytesIO( 480 build_client.GetCoverage(_BRANCH, build_flavor, 481 device_build_id, product)) 482 cov_zip = zipfile.ZipFile(cov_zip) 483 setattr(self, _COVERAGE_ZIP, cov_zip) 484 except Exception as e: 485 logging.exception('Failed to fetch coverage zip: %s', e) 486 logging.info('Coverage disabled') 487 return 488 489 if not hasattr(self, keys.ConfigKeys.IKEY_MODULES): 490 checksum_gcno_dict = self._GetChecksumGcnoDict(cov_zip) 491 setattr(self, _CHECKSUM_GCNO_DICT, checksum_gcno_dict) 492 493 self.enabled = True 494 495 def SetCoverageData(self, coverage_data=None, isGlobal=False, dut=None): 496 """Sets and processes coverage data. 497 498 Organizes coverage data and processes it into a coverage report in the 499 current test case 500 501 Requires feature to be enabled; no-op otherwise. 502 503 Args: 504 coverage_data may be either: 505 (1) a dict where gcda name is the key and binary 506 content is the value, or 507 (2) a list of NativeCodeCoverageRawDataMessage objects 508 (3) None if the data will be pulled from dut 509 isGlobal: True if the coverage data is for the entire test, False if 510 if the coverage data is just for the current test case. 511 dut: (optional) the device object for which to pull coverage data 512 """ 513 if not self.enabled: 514 return 515 516 if not coverage_data and dut: 517 coverage_data = self.GetGcdaDict(dut) 518 519 if not coverage_data: 520 logging.info("SetCoverageData: empty coverage data") 521 return 522 523 if isinstance(coverage_data, RepeatedCompositeFieldContainer): 524 gcda_dict = {} 525 for coverage_msg in coverage_data: 526 gcda_dict[coverage_msg.file_path] = coverage_msg.gcda 527 elif isinstance(coverage_data, dict): 528 gcda_dict = coverage_data 529 else: 530 logging.error("SetCoverageData: unexpected coverage_data type: %s", 531 str(type(coverage_data))) 532 return 533 logging.info("coverage file paths %s", str([fp for fp in gcda_dict])) 534 535 if not hasattr(self, keys.ConfigKeys.IKEY_MODULES): 536 # auto-process coverage data 537 self._AutoProcess(gcda_dict, isGlobal) 538 else: 539 # explicitly process coverage data for the specified modules 540 self._ManualProcess(gcda_dict, isGlobal) 541