• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2
3"""
4This is a generic fuzz testing tool, see --help for more information.
5"""
6
7import os
8import sys
9import random
10import subprocess
11import itertools
12
13class TestGenerator:
14    def __init__(self, inputs, delete, insert, replace,
15                 insert_strings, pick_input):
16        self.inputs = [(s, open(s).read()) for s in inputs]
17
18        self.delete = bool(delete)
19        self.insert = bool(insert)
20        self.replace = bool(replace)
21        self.pick_input = bool(pick_input)
22        self.insert_strings = list(insert_strings)
23
24        self.num_positions = sum([len(d) for _,d in self.inputs])
25        self.num_insert_strings = len(insert_strings)
26        self.num_tests = ((delete + (insert + replace)*self.num_insert_strings)
27                          * self.num_positions)
28        self.num_tests += 1
29
30        if self.pick_input:
31            self.num_tests *= self.num_positions
32
33    def position_to_source_index(self, position):
34        for i,(s,d) in enumerate(self.inputs):
35            n = len(d)
36            if position < n:
37                return (i,position)
38            position -= n
39        raise ValueError,'Invalid position.'
40
41    def get_test(self, index):
42        assert 0 <= index < self.num_tests
43
44        picked_position = None
45        if self.pick_input:
46            index,picked_position = divmod(index, self.num_positions)
47            picked_position = self.position_to_source_index(picked_position)
48
49        if index == 0:
50            return ('nothing', None, None, picked_position)
51
52        index -= 1
53        index,position = divmod(index, self.num_positions)
54        position = self.position_to_source_index(position)
55        if self.delete:
56            if index == 0:
57                return ('delete', position, None, picked_position)
58            index -= 1
59
60        index,insert_index = divmod(index, self.num_insert_strings)
61        insert_str = self.insert_strings[insert_index]
62        if self.insert:
63            if index == 0:
64                return ('insert', position, insert_str, picked_position)
65            index -= 1
66
67        assert self.replace
68        assert index == 0
69        return ('replace', position, insert_str, picked_position)
70
71class TestApplication:
72    def __init__(self, tg, test):
73        self.tg = tg
74        self.test = test
75
76    def apply(self):
77        if self.test[0] == 'nothing':
78            pass
79        else:
80            i,j = self.test[1]
81            name,data = self.tg.inputs[i]
82            if self.test[0] == 'delete':
83                data = data[:j] + data[j+1:]
84            elif self.test[0] == 'insert':
85                data = data[:j] + self.test[2] + data[j:]
86            elif self.test[0] == 'replace':
87                data = data[:j] + self.test[2] + data[j+1:]
88            else:
89                raise ValueError,'Invalid test %r' % self.test
90            open(name,'wb').write(data)
91
92    def revert(self):
93        if self.test[0] != 'nothing':
94            i,j = self.test[1]
95            name,data = self.tg.inputs[i]
96            open(name,'wb').write(data)
97
98def quote(str):
99    return '"' + str + '"'
100
101def run_one_test(test_application, index, input_files, args):
102    test = test_application.test
103
104    # Interpolate arguments.
105    options = { 'index' : index,
106                'inputs' : ' '.join(quote(f) for f in input_files) }
107
108    # Add picked input interpolation arguments, if used.
109    if test[3] is not None:
110        pos = test[3][1]
111        options['picked_input'] = input_files[test[3][0]]
112        options['picked_input_pos'] = pos
113        # Compute the line and column.
114        file_data = test_application.tg.inputs[test[3][0]][1]
115        line = column = 1
116        for i in range(pos):
117            c = file_data[i]
118            if c == '\n':
119                line += 1
120                column = 1
121            else:
122                column += 1
123        options['picked_input_line'] = line
124        options['picked_input_col'] = column
125
126    test_args = [a % options for a in args]
127    if opts.verbose:
128        print '%s: note: executing %r' % (sys.argv[0], test_args)
129
130    stdout = None
131    stderr = None
132    if opts.log_dir:
133        stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index)
134        stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index)
135        stdout = open(stdout_log_path, 'wb')
136        stderr = open(stderr_log_path, 'wb')
137    else:
138        sys.stdout.flush()
139    p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr)
140    p.communicate()
141    exit_code = p.wait()
142
143    test_result = (exit_code == opts.expected_exit_code or
144                   exit_code in opts.extra_exit_codes)
145
146    if stdout is not None:
147        stdout.close()
148        stderr.close()
149
150        # Remove the logs for passes, unless logging all results.
151        if not opts.log_all and test_result:
152            os.remove(stdout_log_path)
153            os.remove(stderr_log_path)
154
155    if not test_result:
156        print 'FAIL: %d' % index
157    elif not opts.succinct:
158        print 'PASS: %d' % index
159    return test_result
160
161def main():
162    global opts
163    from optparse import OptionParser, OptionGroup
164    parser = OptionParser("""%prog [options] ... test command args ...
165
166%prog is a tool for fuzzing inputs and testing them.
167
168The most basic usage is something like:
169
170  $ %prog --file foo.txt ./test.sh
171
172which will run a default list of fuzzing strategies on the input. For each
173fuzzed input, it will overwrite the input files (in place), run the test script,
174then restore the files back to their original contents.
175
176NOTE: You should make sure you have a backup copy of your inputs, in case
177something goes wrong!!!
178
179You can cause the fuzzing to not restore the original files with
180'--no-revert'. Generally this is used with '--test <index>' to run one failing
181test and then leave the fuzzed inputs in place to examine the failure.
182
183For each fuzzed input, %prog will run the test command given on the command
184line. Each argument in the command is subject to string interpolation before
185being executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard
186printf format, and VARIABLE is one of:
187
188  'index' - the test index being run
189  'inputs' - the full list of test inputs
190  'picked_input'      - (with --pick-input) the selected input file
191  'picked_input_pos'  - (with --pick-input) the selected input position
192  'picked_input_line' - (with --pick-input) the selected input line
193  'picked_input_col'  - (with --pick-input) the selected input column
194
195By default, the script will run forever continually picking new tests to
196run. You can limit the number of tests that are run with '--max-tests <number>',
197and you can run a particular test with '--test <index>'.
198
199You can specify '--stop-on-fail' to stop the script on the first failure
200without reverting the changes.
201
202""")
203    parser.add_option("-v", "--verbose", help="Show more output",
204                      action='store_true', dest="verbose", default=False)
205    parser.add_option("-s", "--succinct",  help="Reduce amount of output",
206                      action="store_true", dest="succinct", default=False)
207
208    group = OptionGroup(parser, "Test Execution")
209    group.add_option("", "--expected-exit-code", help="Set expected exit code",
210                     type=int, dest="expected_exit_code",
211                     default=0)
212    group.add_option("", "--extra-exit-code",
213                     help="Set additional expected exit code",
214                     type=int, action="append", dest="extra_exit_codes",
215                     default=[])
216    group.add_option("", "--log-dir",
217                     help="Capture test logs to an output directory",
218                     type=str, dest="log_dir",
219                     default=None)
220    group.add_option("", "--log-all",
221                     help="Log all outputs (not just failures)",
222                     action="store_true", dest="log_all", default=False)
223    parser.add_option_group(group)
224
225    group = OptionGroup(parser, "Input Files")
226    group.add_option("", "--file", metavar="PATH",
227                     help="Add an input file to fuzz",
228                     type=str, action="append", dest="input_files", default=[])
229    group.add_option("", "--filelist", metavar="LIST",
230                     help="Add a list of inputs files to fuzz (one per line)",
231                     type=str, action="append", dest="filelists", default=[])
232    parser.add_option_group(group)
233
234    group = OptionGroup(parser, "Fuzz Options")
235    group.add_option("", "--replacement-chars", dest="replacement_chars",
236                     help="Characters to insert/replace",
237                     default="0{}[]<>\;@#$^%& ")
238    group.add_option("", "--replacement-string", dest="replacement_strings",
239                     action="append", help="Add a replacement string to use",
240                     default=[])
241    group.add_option("", "--replacement-list", dest="replacement_lists",
242                     help="Add a list of replacement strings (one per line)",
243                     action="append", default=[])
244    group.add_option("", "--no-delete", help="Don't delete characters",
245                     action='store_false', dest="enable_delete", default=True)
246    group.add_option("", "--no-insert", help="Don't insert strings",
247                     action='store_false', dest="enable_insert", default=True)
248    group.add_option("", "--no-replace", help="Don't replace strings",
249                     action='store_false', dest="enable_replace", default=True)
250    group.add_option("", "--no-revert", help="Don't revert changes",
251                     action='store_false', dest="revert", default=True)
252    group.add_option("", "--stop-on-fail", help="Stop on first failure",
253                     action='store_true', dest="stop_on_fail", default=False)
254    parser.add_option_group(group)
255
256    group = OptionGroup(parser, "Test Selection")
257    group.add_option("", "--test", help="Run a particular test",
258                     type=int, dest="test", default=None, metavar="INDEX")
259    group.add_option("", "--max-tests", help="Maximum number of tests",
260                     type=int, dest="max_tests", default=None, metavar="COUNT")
261    group.add_option("", "--pick-input",
262                     help="Randomly select an input byte as well as fuzzing",
263                     action='store_true', dest="pick_input", default=False)
264    parser.add_option_group(group)
265
266    parser.disable_interspersed_args()
267
268    (opts, args) = parser.parse_args()
269
270    if not args:
271        parser.error("Invalid number of arguments")
272
273    # Collect the list of inputs.
274    input_files = list(opts.input_files)
275    for filelist in opts.filelists:
276        f = open(filelist)
277        try:
278            for ln in f:
279                ln = ln.strip()
280                if ln:
281                    input_files.append(ln)
282        finally:
283            f.close()
284    input_files.sort()
285
286    if not input_files:
287        parser.error("No input files!")
288
289    print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files))
290
291    # Make sure the log directory exists if used.
292    if opts.log_dir:
293        if not os.path.exists(opts.log_dir):
294            try:
295                os.mkdir(opts.log_dir)
296            except OSError:
297                print "%s: error: log directory couldn't be created!" % (
298                    sys.argv[0],)
299                raise SystemExit,1
300
301    # Get the list if insert/replacement strings.
302    replacements = list(opts.replacement_chars)
303    replacements.extend(opts.replacement_strings)
304    for replacement_list in opts.replacement_lists:
305        f = open(replacement_list)
306        try:
307            for ln in f:
308                ln = ln[:-1]
309                if ln:
310                    replacements.append(ln)
311        finally:
312            f.close()
313
314    # Unique and order the replacement list.
315    replacements = list(set(replacements))
316    replacements.sort()
317
318    # Create the test generator.
319    tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert,
320                       opts.enable_replace, replacements, opts.pick_input)
321
322    print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions)
323    print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests)
324    if opts.test is not None:
325        it = [opts.test]
326    elif opts.max_tests is not None:
327        it = itertools.imap(random.randrange,
328                            itertools.repeat(tg.num_tests, opts.max_tests))
329    else:
330        it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests))
331    for test in it:
332        t = tg.get_test(test)
333
334        if opts.verbose:
335            print '%s: note: running test %d: %r' % (sys.argv[0], test, t)
336        ta = TestApplication(tg, t)
337        try:
338            ta.apply()
339            test_result = run_one_test(ta, test, input_files, args)
340            if not test_result and opts.stop_on_fail:
341                opts.revert = False
342                sys.exit(1)
343        finally:
344            if opts.revert:
345                ta.revert()
346
347        sys.stdout.flush()
348
349if __name__ == '__main__':
350    main()
351