• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module for determining coverage of fuzz targets."""
15import json
16import logging
17import os
18import sys
19
20import http_utils
21
22# pylint: disable=wrong-import-position,import-error
23sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
24import utils
25
26# The path to get OSS-Fuzz project's latest report json file.`
27OSS_FUZZ_LATEST_COVERAGE_INFO_PATH = 'oss-fuzz-coverage/latest_report_info/'
28
29
30# pylint: disable=too-few-public-methods
31class CoverageError(Exception):
32  """Exceptions for project coverage."""
33
34
35class BaseCoverage:
36  """Gets coverage data for a project."""
37
38  def __init__(self, repo_path):
39    self.repo_path = _normalize_repo_path(repo_path)
40
41  def get_files_covered_by_target(self, target):
42    """Returns a list of source files covered by the specific fuzz target.
43
44    Args:
45      target: The name of the fuzz target whose coverage is requested.
46
47    Returns:
48      A list of files that the fuzz target covers or None.
49    """
50    target_cov = self.get_target_coverage(target)
51    if not target_cov:
52      logging.info('No coverage available for %s', target)
53      return None
54
55    coverage_per_file = get_coverage_per_file(target_cov)
56    if not coverage_per_file:
57      logging.info('No files found in coverage report.')
58      return None
59
60    affected_file_list = []
61    for file_cov in coverage_per_file:
62      norm_file_path = os.path.normpath(file_cov['filename'])
63      if not norm_file_path.startswith(self.repo_path):
64        # Exclude files outside of the main repo.
65        continue
66
67      if not is_file_covered(file_cov):
68        # Don't consider a file affected if code in it is never executed.
69        continue
70
71      # TODO(metzman): It's weird to me that we access file_cov['filename']
72      # again and not norm_file_path, figure out if this makes sense.
73      relative_path = utils.remove_prefix(file_cov['filename'], self.repo_path)
74      affected_file_list.append(relative_path)
75
76    return affected_file_list
77
78  def get_target_coverage(self, target):
79    """Get the coverage report for a specific fuzz target.
80
81    Args:
82      target: The name of the fuzz target whose coverage is requested.
83
84    Returns:
85      The target's coverage json dict or None on failure.
86    """
87    raise NotImplementedError('Child class must implement method.')
88
89
90class OSSFuzzCoverage(BaseCoverage):
91  """Gets coverage data for a project from OSS-Fuzz."""
92
93  def __init__(self, repo_path, oss_fuzz_project_name):
94    """Constructor for OSSFuzzCoverage."""
95    super().__init__(repo_path)
96    self.oss_fuzz_project_name = oss_fuzz_project_name
97    self.fuzzer_stats_url = _get_oss_fuzz_fuzzer_stats_dir_url(
98        self.oss_fuzz_project_name)
99    if self.fuzzer_stats_url is None:
100      raise CoverageError('Could not get latest coverage.')
101
102  def get_target_coverage(self, target):
103    """Get the coverage report for a specific fuzz target.
104
105    Args:
106      target: The name of the fuzz target whose coverage is requested.
107
108    Returns:
109      The target's coverage json dict or None on failure.
110    """
111    if not self.fuzzer_stats_url:
112      return None
113
114    target_url = utils.url_join(self.fuzzer_stats_url, target + '.json')
115    return http_utils.get_json_from_url(target_url)
116
117
118def _get_oss_fuzz_latest_cov_report_info(oss_fuzz_project_name):
119  """Gets and returns a dictionary containing the latest coverage report info
120  for |project|."""
121  latest_report_info_url = utils.url_join(utils.GCS_BASE_URL,
122                                          OSS_FUZZ_LATEST_COVERAGE_INFO_PATH,
123                                          oss_fuzz_project_name + '.json')
124  latest_cov_info = http_utils.get_json_from_url(latest_report_info_url)
125  if latest_cov_info is None:
126    logging.error('Could not get the coverage report json from url: %s.',
127                  latest_report_info_url)
128    return None
129  return latest_cov_info
130
131
132def _get_oss_fuzz_fuzzer_stats_dir_url(oss_fuzz_project_name):
133  """Gets latest coverage report info for a specific OSS-Fuzz project from
134  GCS.
135
136  Args:
137    oss_fuzz_project_name: The name of the project.
138
139  Returns:
140    The projects coverage report info in json dict or None on failure.
141  """
142  latest_cov_info = _get_oss_fuzz_latest_cov_report_info(oss_fuzz_project_name)
143
144  if not latest_cov_info:
145    return None
146
147  if 'fuzzer_stats_dir' not in latest_cov_info:
148    logging.error('fuzzer_stats_dir not in latest coverage info.')
149    return None
150
151  fuzzer_stats_dir_gs_url = latest_cov_info['fuzzer_stats_dir']
152  fuzzer_stats_dir_url = utils.gs_url_to_https(fuzzer_stats_dir_gs_url)
153  return fuzzer_stats_dir_url
154
155
156class FilesystemCoverage(BaseCoverage):
157  """Class that gets a project's coverage from the filesystem."""
158
159  def __init__(self, repo_path, project_coverage_dir):
160    super().__init__(repo_path)
161    self.project_coverage_dir = project_coverage_dir
162
163  def get_target_coverage(self, target):
164    """Get the coverage report for a specific fuzz target.
165
166    Args:
167      target: The name of the fuzz target whose coverage is requested.
168
169    Returns:
170      The target's coverage json dict or None on failure.
171    """
172    logging.info('Getting coverage for %s from filesystem.', target)
173    fuzzer_stats_json_path = os.path.join(self.project_coverage_dir,
174                                          'fuzzer_stats', target + '.json')
175    if not os.path.exists(fuzzer_stats_json_path):
176      logging.warning('%s does not exist.', fuzzer_stats_json_path)
177      return None
178
179    with open(fuzzer_stats_json_path) as fuzzer_stats_json_file_handle:
180      try:
181        return json.load(fuzzer_stats_json_file_handle)
182      except json.decoder.JSONDecodeError as err:
183        logging.error('Could not decode: %s. Error: %s.',
184                      fuzzer_stats_json_path, err)
185        return None
186
187
188def is_file_covered(file_cov):
189  """Returns whether the file is covered."""
190  return file_cov['summary']['regions']['covered']
191
192
193def get_coverage_per_file(target_cov):
194  """Returns the coverage per file within |target_cov|."""
195  return target_cov['data'][0]['files']
196
197
198def _normalize_repo_path(repo_path):
199  """Normalizes and returns |repo_path| to make sure cases like /src/curl and
200  /src/curl/ are both handled."""
201  repo_path = os.path.normpath(repo_path)
202  if not repo_path.endswith('/'):
203    repo_path += '/'
204  return repo_path
205