1# 2# Copyright (C) 2016 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import logging 18import os 19 20from vts.runners.host import asserts 21from vts.runners.host import base_test 22from vts.runners.host import const 23from vts.runners.host import keys 24from vts.runners.host import test_runner 25from vts.utils.python.controllers import adb 26from vts.utils.python.controllers import android_device 27from vts.utils.python.common import list_utils 28from vts.utils.python.os import path_utils 29 30from vts.testcases.template.llvmfuzzer_test import llvmfuzzer_test_config as config 31 32class LLVMFuzzerTest(base_test.BaseTestClass): 33 """Runs fuzzer tests on target. 34 35 Attributes: 36 _dut: AndroidDevice, the device under test as config 37 _testcases: string list, list of testcases to run 38 """ 39 def setUpClass(self): 40 """Creates a remote shell instance, and copies data files.""" 41 required_params = [ 42 keys.ConfigKeys.IKEY_DATA_FILE_PATH, 43 config.ConfigKeys.FUZZER_CONFIGS 44 ] 45 self.getUserParams(required_params) 46 47 self._testcases = map(lambda x: str(x), self.fuzzer_configs.keys()) 48 49 logging.info("Testcases: %s", self._testcases) 50 logging.info("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH, 51 self.data_file_path) 52 logging.info("%s: %s", config.ConfigKeys.FUZZER_CONFIGS, 53 self.fuzzer_configs) 54 55 self._dut = self.registerController(android_device, False)[0] 56 self._dut.adb.shell("mkdir %s -p" % config.FUZZER_TEST_DIR) 57 58 def tearDownClass(self): 59 """Deletes all copied data.""" 60 self._dut.adb.shell("rm -rf %s" % config.FUZZER_TEST_DIR) 61 62 def PushFiles(self, testcase): 63 """adb pushes testcase file to target. 64 65 Args: 66 testcase: string, path to executable fuzzer. 67 """ 68 push_src = os.path.join(self.data_file_path, config.FUZZER_SRC_DIR, testcase) 69 self._dut.adb.push("%s %s" % (push_src, config.FUZZER_TEST_DIR)) 70 logging.info("Adb pushed: %s", testcase) 71 72 def CreateFuzzerFlags(self, fuzzer_config): 73 """Creates flags for the fuzzer executable. 74 75 Args: 76 fuzzer_config: dict, contains configuration for the fuzzer. 77 78 Returns: 79 string, command line flags for fuzzer executable. 80 """ 81 def _SerializeVTSFuzzerParams(params): 82 """Creates VTS command line flags for fuzzer executable. 83 84 Args: 85 params: dict, contains flags and their values. 86 87 Returns: 88 string, of form "--<flag0>=<val0> --<flag1>=<val1> ... " 89 """ 90 VTS_SPEC_FILES = "vts_spec_files" 91 VTS_EXEC_SIZE = "vts_exec_size" 92 DELIMITER = ":" 93 94 # vts_spec_files is a string list, will be serialized like this: 95 # [a, b, c] -> "a:b:c" 96 vts_spec_files = params.get(VTS_SPEC_FILES, {}) 97 target_vts_spec_files = DELIMITER.join(map( 98 lambda x: path_utils.JoinTargetPath(config.FUZZER_SPEC_DIR, x), 99 vts_spec_files)) 100 flags = "--%s=\"%s\" " % (VTS_SPEC_FILES, target_vts_spec_files) 101 102 vts_exec_size = params.get(VTS_EXEC_SIZE, {}) 103 flags += "--%s=%s" % (VTS_EXEC_SIZE, vts_exec_size) 104 return flags 105 106 def _SerializeLLVMFuzzerParams(params): 107 """Creates LLVM libfuzzer command line flags for fuzzer executable. 108 109 Args: 110 params: dict, contains flags and their values. 111 112 Returns: 113 string, of form "--<flag0>=<val0> --<flag1>=<val1> ... " 114 """ 115 return " ".join(["-%s=%s" % (k, v) for k, v in params.items()]) 116 117 118 vts_fuzzer_params = fuzzer_config.get("vts_fuzzer_params", {}) 119 120 llvmfuzzer_params = config.FUZZER_PARAMS.copy() 121 llvmfuzzer_params.update(fuzzer_config.get("llvmfuzzer_params", {})) 122 123 vts_fuzzer_flags = _SerializeVTSFuzzerParams(vts_fuzzer_params) 124 llvmfuzzer_flags = _SerializeLLVMFuzzerParams(llvmfuzzer_params) 125 126 return vts_fuzzer_flags + " -- " + llvmfuzzer_flags 127 128 def CreateCorpus(self, fuzzer, fuzzer_config): 129 """Creates a corpus directory on target. 130 131 Args: 132 fuzzer: string, name of the fuzzer executable. 133 fuzzer_config: dict, contains configuration for the fuzzer. 134 135 Returns: 136 string, path to corpus directory on the target. 137 """ 138 corpus = fuzzer_config.get("corpus", []) 139 corpus_dir = path_utils.JoinTargetPath( 140 config.FUZZER_TEST_DIR, "%s_corpus" % fuzzer) 141 142 self._dut.adb.shell("mkdir %s -p" % corpus_dir) 143 for idx, corpus_entry in enumerate(corpus): 144 corpus_entry = corpus_entry.replace("x", "\\x") 145 corpus_entry_file = path_utils.JoinTargetPath( 146 corpus_dir, "input%s" % idx) 147 cmd = "echo -ne '%s' > %s" % (str(corpus_entry), corpus_entry_file) 148 # Vts shell drive doesn't play nicely with escape characters, 149 # so we use adb shell. 150 self._dut.adb.shell("\"%s\"" % cmd) 151 152 return corpus_dir 153 154 def RunTestcase(self, fuzzer): 155 """Runs the given testcase and asserts the result. 156 157 Args: 158 fuzzer: string, name of fuzzer executable. 159 """ 160 self.PushFiles(fuzzer) 161 162 fuzzer_config = self.fuzzer_configs.get(fuzzer, {}) 163 test_flags = self.CreateFuzzerFlags(fuzzer_config) 164 corpus_dir = self.CreateCorpus(fuzzer, fuzzer_config) 165 166 chmod_cmd = "chmod -R 755 %s" % path_utils.JoinTargetPath( 167 config.FUZZER_TEST_DIR, fuzzer) 168 self._dut.adb.shell(chmod_cmd) 169 170 cd_cmd = "cd %s" % config.FUZZER_TEST_DIR 171 ld_path = "LD_LIBRARY_PATH=/data/local/tmp/64:/data/local/tmp/32:$LD_LIBRARY_PATH" 172 test_cmd = "./%s" % fuzzer 173 174 fuzz_cmd = "%s && %s %s %s %s > /dev/null" % ( 175 cd_cmd, ld_path, test_cmd, corpus_dir, test_flags) 176 logging.info("Executing: %s", fuzz_cmd) 177 # TODO(trong): vts shell doesn't handle timeouts properly, change this after it does. 178 try: 179 stdout = self._dut.adb.shell("'%s'" % fuzz_cmd) 180 result = { 181 const.STDOUT: stdout, 182 const.STDERR: "", 183 const.EXIT_CODE: 0 184 } 185 except adb.AdbError as e: 186 result = { 187 const.STDOUT: e.stdout, 188 const.STDERR: e.stderr, 189 const.EXIT_CODE: e.ret_code 190 } 191 self.AssertTestResult(fuzzer, result) 192 193 def LogCrashReport(self, fuzzer): 194 """Logs crash-causing fuzzer input. 195 196 Reads the crash report file and logs the contents in format: 197 "\x01\x23\x45\x67\x89\xab\xcd\xef" 198 199 Args: 200 fuzzer: string, name of fuzzer executable. 201 """ 202 cmd = "xxd -p %s" % config.FUZZER_TEST_CRASH_REPORT 203 204 # output is string of a hexdump from crash report file. 205 # From the example above, output would be "0123456789abcdef". 206 output = self._dut.adb.shell(cmd) 207 remove_chars = ["\r", "\t", "\n", " "] 208 for char in remove_chars: 209 output = output.replace(char, "") 210 211 crash_report = "" 212 # output is guaranteed to be even in length since its a hexdump. 213 for offset in xrange(0, len(output), 2): 214 crash_report += "\\x%s" % output[offset:offset + 2] 215 216 logging.info('FUZZER_TEST_CRASH_REPORT for %s: "%s"', fuzzer, crash_report) 217 218 # TODO(trong): differentiate between crashes and sanitizer rule violations. 219 def AssertTestResult(self, fuzzer, result): 220 """Asserts that testcase finished as expected. 221 222 Checks that device is in responsive state. If not, waits for boot 223 then reports test as failure. If it is, asserts that all test commands 224 returned exit code 0. 225 226 Args: 227 fuzzer: string, name of fuzzer executable. 228 result: dict(str, str, int), command results from shell. 229 """ 230 logging.info("Test result: %s" % result) 231 if not self._dut.hasBooted(): 232 self._dut.waitForBootCompletion() 233 asserts.fail("%s left the device in unresponsive state." % fuzzer) 234 235 exit_code = result[const.EXIT_CODE] 236 if exit_code == config.ExitCode.FUZZER_TEST_FAIL: 237 self.LogCrashReport(fuzzer) 238 asserts.fail("%s failed normally." % fuzzer) 239 elif exit_code != config.ExitCode.FUZZER_TEST_PASS: 240 asserts.fail("%s failed abnormally." % fuzzer) 241 242 def generateFuzzerTests(self): 243 """Runs fuzzer tests.""" 244 self.runGeneratedTests( 245 test_func=self.RunTestcase, 246 settings=self._testcases, 247 name_func=lambda x: x.split("/")[-1]) 248 249 250if __name__ == "__main__": 251 test_runner.main() 252