# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tests for running fuzzers.""" import os import sys import shutil import tempfile import unittest from unittest import mock import parameterized from pyfakefs import fake_filesystem_unittest import config_utils import fuzz_target import run_fuzzers # pylint: disable=wrong-import-position INFRA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(INFRA_DIR) import test_helpers # NOTE: This integration test relies on # https://github.com/google/oss-fuzz/tree/master/projects/example project. EXAMPLE_PROJECT = 'example' # Location of files used for testing. TEST_DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_data') MEMORY_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'memory') MEMORY_FUZZER = 'curl_fuzzer_memory' UNDEFINED_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'undefined') UNDEFINED_FUZZER = 'curl_fuzzer_undefined' FUZZ_SECONDS = 10 def _create_config(**kwargs): """Creates a config object and then sets every attribute that is a key in |kwargs| to the corresponding value. Asserts that each key in |kwargs| is an attribute of Config.""" with mock.patch('os.path.basename', return_value=None), mock.patch( 'config_utils.get_project_src_path', return_value=None), mock.patch('config_utils._is_dry_run', return_value=True): config = config_utils.RunFuzzersConfig() for key, value in kwargs.items(): assert hasattr(config, key), 'Config doesn\'t have attribute: ' + key setattr(config, key, value) return config class RunFuzzerIntegrationTestMixin: # pylint: disable=too-few-public-methods,invalid-name """Mixin for integration test classes that runbuild_fuzzers on builds of a specific sanitizer.""" # These must be defined by children. FUZZER_DIR = None FUZZER = None def _test_run_with_sanitizer(self, fuzzer_dir, sanitizer): """Calls run_fuzzers on fuzzer_dir and |sanitizer| and asserts the run succeeded and that no bug was found.""" with test_helpers.temp_dir_copy(fuzzer_dir) as fuzzer_dir_copy: config = _create_config(fuzz_seconds=FUZZ_SECONDS, workspace=fuzzer_dir_copy, project_name='curl', sanitizer=sanitizer) result = run_fuzzers.run_fuzzers(config) self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND) class RunMemoryFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin, unittest.TestCase): """Integration test for build_fuzzers with an MSAN build.""" FUZZER_DIR = MEMORY_FUZZER_DIR FUZZER = MEMORY_FUZZER @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 'INTEGRATION_TESTS=1 not set') def test_run_with_memory_sanitizer(self): """Tests run_fuzzers with a valid MSAN build.""" self._test_run_with_sanitizer(self.FUZZER_DIR, 'memory') class RunUndefinedFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin, unittest.TestCase): """Integration test for build_fuzzers with an UBSAN build.""" FUZZER_DIR = UNDEFINED_FUZZER_DIR FUZZER = UNDEFINED_FUZZER @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 'INTEGRATION_TESTS=1 not set') def test_run_with_undefined_sanitizer(self): """Tests run_fuzzers with a valid UBSAN build.""" self._test_run_with_sanitizer(self.FUZZER_DIR, 'undefined') class BaseFuzzTargetRunnerTest(unittest.TestCase): """Tests BaseFuzzTargetRunner.""" def _create_runner(self, **kwargs): # pylint: disable=no-self-use defaults = {'fuzz_seconds': FUZZ_SECONDS, 'project_name': EXAMPLE_PROJECT} for default_key, default_value in defaults.items(): if default_key not in kwargs: kwargs[default_key] = default_value config = _create_config(**kwargs) return run_fuzzers.BaseFuzzTargetRunner(config) def _test_initialize_fail(self, expected_error_args, **create_runner_kwargs): with mock.patch('logging.error') as mocked_error: runner = self._create_runner(**create_runner_kwargs) self.assertFalse(runner.initialize()) mocked_error.assert_called_with(*expected_error_args) @parameterized.parameterized.expand([(0,), (None,), (-1,)]) def test_initialize_invalid_fuzz_seconds(self, fuzz_seconds): """Tests initialize fails with an invalid fuzz seconds.""" expected_error_args = ('Fuzz_seconds argument must be greater than 1, ' 'but was: %s.', fuzz_seconds) with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'out') os.mkdir(out_path) with mock.patch('utils.get_fuzz_targets') as mocked_get_fuzz_targets: mocked_get_fuzz_targets.return_value = [ os.path.join(out_path, 'fuzz_target') ] self._test_initialize_fail(expected_error_args, fuzz_seconds=fuzz_seconds, workspace=tmp_dir) def test_initialize_no_out_dir(self): """Tests initialize fails with no out dir.""" with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'out') expected_error_args = ('Out directory: %s does not exist.', out_path) self._test_initialize_fail(expected_error_args, workspace=tmp_dir) def test_initialize_nonempty_artifacts(self): """Tests initialize with a file artifacts path.""" with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'out') os.mkdir(out_path) artifacts_path = os.path.join(out_path, 'artifacts') with open(artifacts_path, 'w') as artifacts_handle: artifacts_handle.write('fake') expected_error_args = ( 'Artifacts path: %s exists and is not an empty directory.', artifacts_path) self._test_initialize_fail(expected_error_args, workspace=tmp_dir) def test_initialize_bad_artifacts(self): """Tests initialize with a non-empty artifacts path.""" with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'out') artifacts_path = os.path.join(out_path, 'artifacts') os.makedirs(artifacts_path) artifact_path = os.path.join(artifacts_path, 'artifact') with open(artifact_path, 'w') as artifact_handle: artifact_handle.write('fake') expected_error_args = ( 'Artifacts path: %s exists and is not an empty directory.', artifacts_path) self._test_initialize_fail(expected_error_args, workspace=tmp_dir) @mock.patch('utils.get_fuzz_targets') @mock.patch('logging.error') def test_initialize_empty_artifacts(self, mocked_log_error, mocked_get_fuzz_targets): """Tests initialize with an empty artifacts dir.""" mocked_get_fuzz_targets.return_value = ['fuzz-target'] with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'out') artifacts_path = os.path.join(out_path, 'artifacts') os.makedirs(artifacts_path) runner = self._create_runner(workspace=tmp_dir) self.assertTrue(runner.initialize()) mocked_log_error.assert_not_called() self.assertTrue(os.path.isdir(artifacts_path)) @mock.patch('utils.get_fuzz_targets') @mock.patch('logging.error') def test_initialize_no_artifacts(self, mocked_log_error, mocked_get_fuzz_targets): """Tests initialize with no artifacts dir (the expected setting).""" mocked_get_fuzz_targets.return_value = ['fuzz-target'] with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'out') os.makedirs(out_path) runner = self._create_runner(workspace=tmp_dir) self.assertTrue(runner.initialize()) mocked_log_error.assert_not_called() self.assertTrue(os.path.isdir(os.path.join(out_path, 'artifacts'))) def test_initialize_no_fuzz_targets(self): """Tests initialize with no fuzz targets.""" with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'out') os.makedirs(out_path) expected_error_args = ('No fuzz targets were found in out directory: %s.', out_path) self._test_initialize_fail(expected_error_args, workspace=tmp_dir) def test_get_fuzz_target_artifact(self): """Tests that get_fuzz_target_artifact works as intended.""" runner = self._create_runner() artifacts_dir = 'artifacts-dir' runner.artifacts_dir = artifacts_dir artifact_name = 'artifact-name' target = mock.MagicMock() target_name = 'target_name' target.target_name = target_name fuzz_target_artifact = runner.get_fuzz_target_artifact( target, artifact_name) expected_fuzz_target_artifact = ( 'artifacts-dir/target_name-address-artifact-name') self.assertEqual(fuzz_target_artifact, expected_fuzz_target_artifact) class CiFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase): """Tests that CiFuzzTargetRunner works as intended.""" def setUp(self): self.setUpPyfakefs() @mock.patch('utils.get_fuzz_targets') @mock.patch('run_fuzzers.CiFuzzTargetRunner.run_fuzz_target') @mock.patch('run_fuzzers.CiFuzzTargetRunner.create_fuzz_target_obj') def test_run_fuzz_targets_quits(self, mocked_create_fuzz_target_obj, mocked_run_fuzz_target, mocked_get_fuzz_targets): """Tests that run_fuzz_targets quits on the first crash it finds.""" workspace = 'workspace' out_path = os.path.join(workspace, 'out') self.fs.create_dir(out_path) config = _create_config(fuzz_seconds=FUZZ_SECONDS, workspace=workspace, project_name=EXAMPLE_PROJECT) runner = run_fuzzers.CiFuzzTargetRunner(config) mocked_get_fuzz_targets.return_value = ['target1', 'target2'] runner.initialize() testcase = os.path.join(workspace, 'testcase') self.fs.create_file(testcase) stacktrace = b'stacktrace' mocked_run_fuzz_target.return_value = fuzz_target.FuzzResult( testcase, stacktrace) magic_mock = mock.MagicMock() magic_mock.target_name = 'target1' mocked_create_fuzz_target_obj.return_value = magic_mock self.assertTrue(runner.run_fuzz_targets()) self.assertIn('target1-address-testcase', os.listdir(runner.artifacts_dir)) self.assertEqual(mocked_run_fuzz_target.call_count, 1) class BatchFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase): """Tests that CiFuzzTargetRunner works as intended.""" def setUp(self): self.setUpPyfakefs() @mock.patch('utils.get_fuzz_targets') @mock.patch('run_fuzzers.BatchFuzzTargetRunner.run_fuzz_target') @mock.patch('run_fuzzers.BatchFuzzTargetRunner.create_fuzz_target_obj') def test_run_fuzz_targets_quits(self, mocked_create_fuzz_target_obj, mocked_run_fuzz_target, mocked_get_fuzz_targets): """Tests that run_fuzz_targets doesn't quit on the first crash it finds.""" workspace = 'workspace' out_path = os.path.join(workspace, 'out') self.fs.create_dir(out_path) config = _create_config(fuzz_seconds=FUZZ_SECONDS, workspace=workspace, project_name=EXAMPLE_PROJECT) runner = run_fuzzers.BatchFuzzTargetRunner(config) mocked_get_fuzz_targets.return_value = ['target1', 'target2'] runner.initialize() testcase1 = os.path.join(workspace, 'testcase-aaa') testcase2 = os.path.join(workspace, 'testcase-bbb') self.fs.create_file(testcase1) self.fs.create_file(testcase2) stacktrace = b'stacktrace' call_count = 0 def mock_run_fuzz_target(_): nonlocal call_count if call_count == 0: testcase = testcase1 elif call_count == 1: testcase = testcase2 assert call_count != 2 call_count += 1 return fuzz_target.FuzzResult(testcase, stacktrace) mocked_run_fuzz_target.side_effect = mock_run_fuzz_target magic_mock = mock.MagicMock() magic_mock.target_name = 'target1' mocked_create_fuzz_target_obj.return_value = magic_mock self.assertTrue(runner.run_fuzz_targets()) self.assertIn('target1-address-testcase-aaa', os.listdir(runner.artifacts_dir)) self.assertEqual(mocked_run_fuzz_target.call_count, 2) class RunAddressFuzzersIntegrationTest(RunFuzzerIntegrationTestMixin, unittest.TestCase): """Integration tests for build_fuzzers with an ASAN build.""" BUILD_DIR_NAME = 'cifuzz-latest-build' @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 'INTEGRATION_TESTS=1 not set') def test_new_bug_found(self): """Tests run_fuzzers with a valid ASAN build.""" # Set the first return value to True, then the second to False to # emulate a bug existing in the current PR but not on the downloaded # OSS-Fuzz build. with mock.patch('fuzz_target.FuzzTarget.is_reproducible', side_effect=[True, False]): with tempfile.TemporaryDirectory() as tmp_dir: workspace = os.path.join(tmp_dir, 'workspace') shutil.copytree(TEST_DATA_PATH, workspace) config = _create_config(fuzz_seconds=FUZZ_SECONDS, workspace=workspace, project_name=EXAMPLE_PROJECT) result = run_fuzzers.run_fuzzers(config) self.assertEqual(result, run_fuzzers.RunFuzzersResult.BUG_FOUND) build_dir = os.path.join(workspace, 'out', self.BUILD_DIR_NAME) self.assertNotEqual(0, len(os.listdir(build_dir))) @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'), 'INTEGRATION_TESTS=1 not set') @mock.patch('fuzz_target.FuzzTarget.is_reproducible', side_effect=[True, True]) def test_old_bug_found(self, _): """Tests run_fuzzers with a bug found in OSS-Fuzz before.""" config = _create_config(fuzz_seconds=FUZZ_SECONDS, workspace=TEST_DATA_PATH, project_name=EXAMPLE_PROJECT) with tempfile.TemporaryDirectory() as tmp_dir: workspace = os.path.join(tmp_dir, 'workspace') shutil.copytree(TEST_DATA_PATH, workspace) config = _create_config(fuzz_seconds=FUZZ_SECONDS, workspace=TEST_DATA_PATH, project_name=EXAMPLE_PROJECT) result = run_fuzzers.run_fuzzers(config) self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND) build_dir = os.path.join(TEST_DATA_PATH, 'out', self.BUILD_DIR_NAME) self.assertTrue(os.path.exists(build_dir)) self.assertNotEqual(0, len(os.listdir(build_dir))) def test_invalid_build(self): """Tests run_fuzzers with an invalid ASAN build.""" with tempfile.TemporaryDirectory() as tmp_dir: out_path = os.path.join(tmp_dir, 'out') os.mkdir(out_path) config = _create_config(fuzz_seconds=FUZZ_SECONDS, workspace=tmp_dir, project_name=EXAMPLE_PROJECT) result = run_fuzzers.run_fuzzers(config) self.assertEqual(result, run_fuzzers.RunFuzzersResult.ERROR) if __name__ == '__main__': unittest.main()