1# Copyright 2021 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests for running fuzzers.""" 15import os 16import sys 17import shutil 18import tempfile 19import unittest 20from unittest import mock 21 22import parameterized 23from pyfakefs import fake_filesystem_unittest 24 25import config_utils 26import fuzz_target 27import run_fuzzers 28 29# pylint: disable=wrong-import-position 30INFRA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 31sys.path.append(INFRA_DIR) 32 33import test_helpers 34 35# NOTE: This integration test relies on 36# https://github.com/google/oss-fuzz/tree/master/projects/example project. 37EXAMPLE_PROJECT = 'example' 38 39# Location of files used for testing. 40TEST_DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 41 'test_data') 42 43MEMORY_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'memory') 44MEMORY_FUZZER = 'curl_fuzzer_memory' 45 46UNDEFINED_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'undefined') 47UNDEFINED_FUZZER = 'curl_fuzzer_undefined' 48 49FUZZ_SECONDS = 10 50 51 52def _create_config(**kwargs): 53 """Creates a config object and then sets every attribute that is a key in 54 |kwargs| to the corresponding value. Asserts that each key in |kwargs| is an 55 attribute of Config.""" 56 with mock.patch('os.path.basename', return_value=None), mock.patch( 57 'config_utils.get_project_src_path', 58 return_value=None), mock.patch('config_utils._is_dry_run', 59 return_value=True): 60 config = config_utils.RunFuzzersConfig() 61 62 for key, value in kwargs.items(): 63 assert hasattr(config, key), 'Config doesn\'t have attribute: ' + key 64 setattr(config, key, value) 65 return config 66 67 68class RunFuzzerIntegrationTestMixin: # pylint: disable=too-few-public-methods,invalid-name 69 """Mixin for integration test classes that runbuild_fuzzers on builds of a 70 specific sanitizer.""" 71 # These must be defined by children. 72 FUZZER_DIR = None 73 FUZZER = None 74 75 def _test_run_with_sanitizer(self, fuzzer_dir, sanitizer): 76 """Calls run_fuzzers on fuzzer_dir and |sanitizer| and asserts 77 the run succeeded and that no bug was found.""" 78 with test_helpers.temp_dir_copy(fuzzer_dir) as fuzzer_dir_copy: 79 config = _create_config(fuzz_seconds=FUZZ_SECONDS, 80 workspace=fuzzer_dir_copy, 81 project_name='curl', 82 sanitizer=sanitizer) 83 result = run_fuzzers.run_fuzzers(config) 84 self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND) 85 86 87class RunMemoryFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin, 88 unittest.TestCase): 89 """Integration test for build_fuzzers with an MSAN build.""" 90 FUZZER_DIR = MEMORY_FUZZER_DIR 91 FUZZER = MEMORY_FUZZER 92 93 @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 94 'INTEGRATION_TESTS=1 not set') 95 def test_run_with_memory_sanitizer(self): 96 """Tests run_fuzzers with a valid MSAN build.""" 97 self._test_run_with_sanitizer(self.FUZZER_DIR, 'memory') 98 99 100class RunUndefinedFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin, 101 unittest.TestCase): 102 """Integration test for build_fuzzers with an UBSAN build.""" 103 FUZZER_DIR = UNDEFINED_FUZZER_DIR 104 FUZZER = UNDEFINED_FUZZER 105 106 @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 107 'INTEGRATION_TESTS=1 not set') 108 def test_run_with_undefined_sanitizer(self): 109 """Tests run_fuzzers with a valid UBSAN build.""" 110 self._test_run_with_sanitizer(self.FUZZER_DIR, 'undefined') 111 112 113class BaseFuzzTargetRunnerTest(unittest.TestCase): 114 """Tests BaseFuzzTargetRunner.""" 115 116 def _create_runner(self, **kwargs): # pylint: disable=no-self-use 117 defaults = {'fuzz_seconds': FUZZ_SECONDS, 'project_name': EXAMPLE_PROJECT} 118 for default_key, default_value in defaults.items(): 119 if default_key not in kwargs: 120 kwargs[default_key] = default_value 121 122 config = _create_config(**kwargs) 123 return run_fuzzers.BaseFuzzTargetRunner(config) 124 125 def _test_initialize_fail(self, expected_error_args, **create_runner_kwargs): 126 with mock.patch('logging.error') as mocked_error: 127 runner = self._create_runner(**create_runner_kwargs) 128 self.assertFalse(runner.initialize()) 129 mocked_error.assert_called_with(*expected_error_args) 130 131 @parameterized.parameterized.expand([(0,), (None,), (-1,)]) 132 def test_initialize_invalid_fuzz_seconds(self, fuzz_seconds): 133 """Tests initialize fails with an invalid fuzz seconds.""" 134 expected_error_args = ('Fuzz_seconds argument must be greater than 1, ' 135 'but was: %s.', fuzz_seconds) 136 with tempfile.TemporaryDirectory() as tmp_dir: 137 out_path = os.path.join(tmp_dir, 'out') 138 os.mkdir(out_path) 139 with mock.patch('utils.get_fuzz_targets') as mocked_get_fuzz_targets: 140 mocked_get_fuzz_targets.return_value = [ 141 os.path.join(out_path, 'fuzz_target') 142 ] 143 self._test_initialize_fail(expected_error_args, 144 fuzz_seconds=fuzz_seconds, 145 workspace=tmp_dir) 146 147 def test_initialize_no_out_dir(self): 148 """Tests initialize fails with no out dir.""" 149 with tempfile.TemporaryDirectory() as tmp_dir: 150 out_path = os.path.join(tmp_dir, 'out') 151 expected_error_args = ('Out directory: %s does not exist.', out_path) 152 self._test_initialize_fail(expected_error_args, workspace=tmp_dir) 153 154 def test_initialize_nonempty_artifacts(self): 155 """Tests initialize with a file artifacts path.""" 156 with tempfile.TemporaryDirectory() as tmp_dir: 157 out_path = os.path.join(tmp_dir, 'out') 158 os.mkdir(out_path) 159 artifacts_path = os.path.join(out_path, 'artifacts') 160 with open(artifacts_path, 'w') as artifacts_handle: 161 artifacts_handle.write('fake') 162 expected_error_args = ( 163 'Artifacts path: %s exists and is not an empty directory.', 164 artifacts_path) 165 self._test_initialize_fail(expected_error_args, workspace=tmp_dir) 166 167 def test_initialize_bad_artifacts(self): 168 """Tests initialize with a non-empty artifacts path.""" 169 with tempfile.TemporaryDirectory() as tmp_dir: 170 out_path = os.path.join(tmp_dir, 'out') 171 artifacts_path = os.path.join(out_path, 'artifacts') 172 os.makedirs(artifacts_path) 173 artifact_path = os.path.join(artifacts_path, 'artifact') 174 with open(artifact_path, 'w') as artifact_handle: 175 artifact_handle.write('fake') 176 expected_error_args = ( 177 'Artifacts path: %s exists and is not an empty directory.', 178 artifacts_path) 179 self._test_initialize_fail(expected_error_args, workspace=tmp_dir) 180 181 @mock.patch('utils.get_fuzz_targets') 182 @mock.patch('logging.error') 183 def test_initialize_empty_artifacts(self, mocked_log_error, 184 mocked_get_fuzz_targets): 185 """Tests initialize with an empty artifacts dir.""" 186 mocked_get_fuzz_targets.return_value = ['fuzz-target'] 187 with tempfile.TemporaryDirectory() as tmp_dir: 188 out_path = os.path.join(tmp_dir, 'out') 189 artifacts_path = os.path.join(out_path, 'artifacts') 190 os.makedirs(artifacts_path) 191 runner = self._create_runner(workspace=tmp_dir) 192 self.assertTrue(runner.initialize()) 193 mocked_log_error.assert_not_called() 194 self.assertTrue(os.path.isdir(artifacts_path)) 195 196 @mock.patch('utils.get_fuzz_targets') 197 @mock.patch('logging.error') 198 def test_initialize_no_artifacts(self, mocked_log_error, 199 mocked_get_fuzz_targets): 200 """Tests initialize with no artifacts dir (the expected setting).""" 201 mocked_get_fuzz_targets.return_value = ['fuzz-target'] 202 with tempfile.TemporaryDirectory() as tmp_dir: 203 out_path = os.path.join(tmp_dir, 'out') 204 os.makedirs(out_path) 205 runner = self._create_runner(workspace=tmp_dir) 206 self.assertTrue(runner.initialize()) 207 mocked_log_error.assert_not_called() 208 self.assertTrue(os.path.isdir(os.path.join(out_path, 'artifacts'))) 209 210 def test_initialize_no_fuzz_targets(self): 211 """Tests initialize with no fuzz targets.""" 212 with tempfile.TemporaryDirectory() as tmp_dir: 213 out_path = os.path.join(tmp_dir, 'out') 214 os.makedirs(out_path) 215 expected_error_args = ('No fuzz targets were found in out directory: %s.', 216 out_path) 217 self._test_initialize_fail(expected_error_args, workspace=tmp_dir) 218 219 def test_get_fuzz_target_artifact(self): 220 """Tests that get_fuzz_target_artifact works as intended.""" 221 runner = self._create_runner() 222 artifacts_dir = 'artifacts-dir' 223 runner.artifacts_dir = artifacts_dir 224 artifact_name = 'artifact-name' 225 target = mock.MagicMock() 226 target_name = 'target_name' 227 target.target_name = target_name 228 fuzz_target_artifact = runner.get_fuzz_target_artifact( 229 target, artifact_name) 230 expected_fuzz_target_artifact = ( 231 'artifacts-dir/target_name-address-artifact-name') 232 self.assertEqual(fuzz_target_artifact, expected_fuzz_target_artifact) 233 234 235class CiFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase): 236 """Tests that CiFuzzTargetRunner works as intended.""" 237 238 def setUp(self): 239 self.setUpPyfakefs() 240 241 @mock.patch('utils.get_fuzz_targets') 242 @mock.patch('run_fuzzers.CiFuzzTargetRunner.run_fuzz_target') 243 @mock.patch('run_fuzzers.CiFuzzTargetRunner.create_fuzz_target_obj') 244 def test_run_fuzz_targets_quits(self, mocked_create_fuzz_target_obj, 245 mocked_run_fuzz_target, 246 mocked_get_fuzz_targets): 247 """Tests that run_fuzz_targets quits on the first crash it finds.""" 248 workspace = 'workspace' 249 out_path = os.path.join(workspace, 'out') 250 self.fs.create_dir(out_path) 251 config = _create_config(fuzz_seconds=FUZZ_SECONDS, 252 workspace=workspace, 253 project_name=EXAMPLE_PROJECT) 254 runner = run_fuzzers.CiFuzzTargetRunner(config) 255 256 mocked_get_fuzz_targets.return_value = ['target1', 'target2'] 257 runner.initialize() 258 testcase = os.path.join(workspace, 'testcase') 259 self.fs.create_file(testcase) 260 stacktrace = b'stacktrace' 261 mocked_run_fuzz_target.return_value = fuzz_target.FuzzResult( 262 testcase, stacktrace) 263 magic_mock = mock.MagicMock() 264 magic_mock.target_name = 'target1' 265 mocked_create_fuzz_target_obj.return_value = magic_mock 266 self.assertTrue(runner.run_fuzz_targets()) 267 self.assertIn('target1-address-testcase', os.listdir(runner.artifacts_dir)) 268 self.assertEqual(mocked_run_fuzz_target.call_count, 1) 269 270 271class BatchFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase): 272 """Tests that CiFuzzTargetRunner works as intended.""" 273 274 def setUp(self): 275 self.setUpPyfakefs() 276 277 @mock.patch('utils.get_fuzz_targets') 278 @mock.patch('run_fuzzers.BatchFuzzTargetRunner.run_fuzz_target') 279 @mock.patch('run_fuzzers.BatchFuzzTargetRunner.create_fuzz_target_obj') 280 def test_run_fuzz_targets_quits(self, mocked_create_fuzz_target_obj, 281 mocked_run_fuzz_target, 282 mocked_get_fuzz_targets): 283 """Tests that run_fuzz_targets doesn't quit on the first crash it finds.""" 284 workspace = 'workspace' 285 out_path = os.path.join(workspace, 'out') 286 self.fs.create_dir(out_path) 287 config = _create_config(fuzz_seconds=FUZZ_SECONDS, 288 workspace=workspace, 289 project_name=EXAMPLE_PROJECT) 290 runner = run_fuzzers.BatchFuzzTargetRunner(config) 291 292 mocked_get_fuzz_targets.return_value = ['target1', 'target2'] 293 runner.initialize() 294 testcase1 = os.path.join(workspace, 'testcase-aaa') 295 testcase2 = os.path.join(workspace, 'testcase-bbb') 296 self.fs.create_file(testcase1) 297 self.fs.create_file(testcase2) 298 stacktrace = b'stacktrace' 299 call_count = 0 300 301 def mock_run_fuzz_target(_): 302 nonlocal call_count 303 if call_count == 0: 304 testcase = testcase1 305 elif call_count == 1: 306 testcase = testcase2 307 assert call_count != 2 308 call_count += 1 309 return fuzz_target.FuzzResult(testcase, stacktrace) 310 311 mocked_run_fuzz_target.side_effect = mock_run_fuzz_target 312 magic_mock = mock.MagicMock() 313 magic_mock.target_name = 'target1' 314 mocked_create_fuzz_target_obj.return_value = magic_mock 315 self.assertTrue(runner.run_fuzz_targets()) 316 self.assertIn('target1-address-testcase-aaa', 317 os.listdir(runner.artifacts_dir)) 318 self.assertEqual(mocked_run_fuzz_target.call_count, 2) 319 320 321class RunAddressFuzzersIntegrationTest(RunFuzzerIntegrationTestMixin, 322 unittest.TestCase): 323 """Integration tests for build_fuzzers with an ASAN build.""" 324 325 BUILD_DIR_NAME = 'cifuzz-latest-build' 326 327 @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 328 'INTEGRATION_TESTS=1 not set') 329 def test_new_bug_found(self): 330 """Tests run_fuzzers with a valid ASAN build.""" 331 # Set the first return value to True, then the second to False to 332 # emulate a bug existing in the current PR but not on the downloaded 333 # OSS-Fuzz build. 334 with mock.patch('fuzz_target.FuzzTarget.is_reproducible', 335 side_effect=[True, False]): 336 with tempfile.TemporaryDirectory() as tmp_dir: 337 workspace = os.path.join(tmp_dir, 'workspace') 338 shutil.copytree(TEST_DATA_PATH, workspace) 339 config = _create_config(fuzz_seconds=FUZZ_SECONDS, 340 workspace=workspace, 341 project_name=EXAMPLE_PROJECT) 342 result = run_fuzzers.run_fuzzers(config) 343 self.assertEqual(result, run_fuzzers.RunFuzzersResult.BUG_FOUND) 344 build_dir = os.path.join(workspace, 'out', self.BUILD_DIR_NAME) 345 self.assertNotEqual(0, len(os.listdir(build_dir))) 346 347 @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 348 'INTEGRATION_TESTS=1 not set') 349 @mock.patch('fuzz_target.FuzzTarget.is_reproducible', 350 side_effect=[True, True]) 351 def test_old_bug_found(self, _): 352 """Tests run_fuzzers with a bug found in OSS-Fuzz before.""" 353 config = _create_config(fuzz_seconds=FUZZ_SECONDS, 354 workspace=TEST_DATA_PATH, 355 project_name=EXAMPLE_PROJECT) 356 with tempfile.TemporaryDirectory() as tmp_dir: 357 workspace = os.path.join(tmp_dir, 'workspace') 358 shutil.copytree(TEST_DATA_PATH, workspace) 359 config = _create_config(fuzz_seconds=FUZZ_SECONDS, 360 workspace=TEST_DATA_PATH, 361 project_name=EXAMPLE_PROJECT) 362 result = run_fuzzers.run_fuzzers(config) 363 self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND) 364 build_dir = os.path.join(TEST_DATA_PATH, 'out', self.BUILD_DIR_NAME) 365 self.assertTrue(os.path.exists(build_dir)) 366 self.assertNotEqual(0, len(os.listdir(build_dir))) 367 368 def test_invalid_build(self): 369 """Tests run_fuzzers with an invalid ASAN build.""" 370 with tempfile.TemporaryDirectory() as tmp_dir: 371 out_path = os.path.join(tmp_dir, 'out') 372 os.mkdir(out_path) 373 config = _create_config(fuzz_seconds=FUZZ_SECONDS, 374 workspace=tmp_dir, 375 project_name=EXAMPLE_PROJECT) 376 result = run_fuzzers.run_fuzzers(config) 377 self.assertEqual(result, run_fuzzers.RunFuzzersResult.ERROR) 378 379 380if __name__ == '__main__': 381 unittest.main() 382