#!/usr/bin/env python # Copyright 2014 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Mutational ClusterFuzz fuzzer. A pre-built corpus of ipcdump files has to be uploaded to ClusterFuzz along with this script. As chrome is being developed, the corpus will become out-of-date and needs to be updated. This fuzzer will pick some ipcdumps from the corpus, concatenate them with ipc_message_util and mutate the result with ipc_fuzzer_mutate. """ import argparse import os import random import string import subprocess import sys import tempfile import time # Number of ipcdumps to concatenate NUM_IPCDUMPS = 50 def create_temp_file(): temp_file = tempfile.NamedTemporaryFile(delete=False) temp_file.close() return temp_file.name def random_id(size=16, chars=string.ascii_lowercase): return ''.join(random.choice(chars) for x in range(size)) def random_ipcdump_path(ipcdump_dir): return os.path.join(ipcdump_dir, 'fuzz-' + random_id() + '.ipcdump') class MutationalFuzzer: def parse_cf_args(self): parser = argparse.ArgumentParser() parser.add_argument('--input_dir') parser.add_argument('--output_dir') parser.add_argument('--no_of_files', type=int) self.args = args = parser.parse_args(); if not args.input_dir or not args.output_dir or not args.no_of_files: parser.print_help() sys.exit(1) def get_paths(self): app_path_key = 'APP_PATH' self.util_binary = 'ipc_message_util' self.mutate_binary = 'ipc_fuzzer_mutate' if app_path_key not in os.environ: sys.exit('Env var %s should be set to chrome path' % app_path_key) chrome_path = os.environ[app_path_key] out_dir = os.path.dirname(chrome_path) self.util_path = os.path.join(out_dir, self.util_binary) self.mutate_path = os.path.join(out_dir, self.mutate_binary) def list_corpus(self): input_dir = self.args.input_dir entries = os.listdir(input_dir) entries = [i for i in entries if i.endswith('.ipcdump')] self.corpus = [os.path.join(input_dir, entry) for entry in entries] def create_mutated_ipcdump(self): ipcdumps = ','.join(random.sample(self.corpus, NUM_IPCDUMPS)) tmp_ipcdump = create_temp_file() mutated_ipcdump = random_ipcdump_path(self.args.output_dir) # concatenate ipcdumps -> tmp_ipcdump cmd = [self.util_path, ipcdumps, tmp_ipcdump] if subprocess.call(cmd): sys.exit('%s failed' % self.util_binary) # mutate tmp_ipcdump -> mutated_ipcdump cmd = [self.mutate_path, tmp_ipcdump, mutated_ipcdump] if subprocess.call(cmd): sys.exit('%s failed' % self.mutate_binary) os.remove(tmp_ipcdump) def main(self): self.parse_cf_args() self.get_paths() self.list_corpus() for i in xrange(self.args.no_of_files): self.create_mutated_ipcdump() return 0 if __name__ == "__main__": fuzzer = MutationalFuzzer() sys.exit(fuzzer.main())