1# Copyright 2021 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Module for determining coverage of fuzz targets.""" 15import logging 16import os 17import sys 18import json 19import urllib.error 20import urllib.request 21 22# pylint: disable=wrong-import-position,import-error 23sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 24import utils 25 26# The path to get project's latest report json file. 27LATEST_REPORT_INFO_PATH = 'oss-fuzz-coverage/latest_report_info/' 28 29 30class OssFuzzCoverageGetter: 31 """Gets coverage data for a project from OSS-Fuzz.""" 32 33 def __init__(self, project_name, repo_path): 34 """Constructor for OssFuzzCoverageGetter. Callers should check that 35 fuzzer_stats_url is initialized.""" 36 self.project_name = project_name 37 self.repo_path = _normalize_repo_path(repo_path) 38 self.fuzzer_stats_url = _get_fuzzer_stats_dir_url(self.project_name) 39 40 def get_target_coverage_report(self, target): 41 """Get the coverage report for a specific fuzz target. 42 43 Args: 44 target: The name of the fuzz target whose coverage is requested. 45 46 Returns: 47 The target's coverage json dict or None on failure. 48 """ 49 if not self.fuzzer_stats_url: 50 return None 51 52 target_url = utils.url_join(self.fuzzer_stats_url, target + '.json') 53 return get_json_from_url(target_url) 54 55 def get_files_covered_by_target(self, target): 56 """Gets a list of source files covered by the specific fuzz target. 57 58 Args: 59 target: The name of the fuzz target whose coverage is requested. 60 61 Returns: 62 A list of files that the fuzz targets covers or None. 63 """ 64 target_cov = self.get_target_coverage_report(target) 65 if not target_cov: 66 return None 67 68 coverage_per_file = get_coverage_per_file(target_cov) 69 if not coverage_per_file: 70 logging.info('No files found in coverage report.') 71 return None 72 73 affected_file_list = [] 74 for file_cov in coverage_per_file: 75 norm_file_path = os.path.normpath(file_cov['filename']) 76 if not norm_file_path.startswith(self.repo_path): 77 # Exclude files outside of the main repo. 78 continue 79 80 if not is_file_covered(file_cov): 81 # Don't consider a file affected if code in it is never executed. 82 continue 83 84 # TODO(metzman): It's weird to me that we access file_cov['filename'] 85 # again and not norm_file_path, figure out if this makes sense. 86 relative_path = utils.remove_prefix(file_cov['filename'], self.repo_path) 87 affected_file_list.append(relative_path) 88 89 return affected_file_list 90 91 92def is_file_covered(file_cov): 93 """Returns whether the file is covered.""" 94 return file_cov['summary']['regions']['covered'] 95 96 97def get_coverage_per_file(target_cov): 98 """Returns the coverage per file within |target_cov|.""" 99 return target_cov['data'][0]['files'] 100 101 102def _normalize_repo_path(repo_path): 103 """Normalizes and returns |repo_path| to make sure cases like /src/curl and 104 /src/curl/ are both handled.""" 105 repo_path = os.path.normpath(repo_path) 106 if not repo_path.endswith('/'): 107 repo_path += '/' 108 return repo_path 109 110 111def _get_latest_cov_report_info(project_name): 112 """Gets and returns a dictionary containing the latest coverage report info 113 for |project|.""" 114 latest_report_info_url = utils.url_join(utils.GCS_BASE_URL, 115 LATEST_REPORT_INFO_PATH, 116 project_name + '.json') 117 latest_cov_info = get_json_from_url(latest_report_info_url) 118 if latest_cov_info is None: 119 logging.error('Could not get the coverage report json from url: %s.', 120 latest_report_info_url) 121 return None 122 return latest_cov_info 123 124 125def _get_fuzzer_stats_dir_url(project_name): 126 """Gets latest coverage report info for a specific OSS-Fuzz project from GCS. 127 128 Args: 129 project_name: The name of the relevant OSS-Fuzz project. 130 131 Returns: 132 The projects coverage report info in json dict or None on failure. 133 """ 134 latest_cov_info = _get_latest_cov_report_info(project_name) 135 136 if not latest_cov_info: 137 return None 138 139 if 'fuzzer_stats_dir' not in latest_cov_info: 140 logging.error('fuzzer_stats_dir not in latest coverage info.') 141 return None 142 143 fuzzer_stats_dir_gs_url = latest_cov_info['fuzzer_stats_dir'] 144 fuzzer_stats_dir_url = utils.gs_url_to_https(fuzzer_stats_dir_gs_url) 145 return fuzzer_stats_dir_url 146 147 148def get_json_from_url(url): 149 """Gets a json object from a specified HTTP URL. 150 151 Args: 152 url: The url of the json to be downloaded. 153 154 Returns: 155 A dictionary deserialized from JSON or None on failure. 156 """ 157 try: 158 response = urllib.request.urlopen(url) 159 except urllib.error.HTTPError: 160 logging.error('HTTP error with url %s.', url) 161 return None 162 163 try: 164 # read().decode() fixes compatibility issue with urllib response object. 165 result_json = json.loads(response.read().decode()) 166 except (ValueError, TypeError, json.JSONDecodeError) as err: 167 logging.error('Loading json from url %s failed with: %s.', url, str(err)) 168 return None 169 return result_json 170