#!/usr/bin/env python3 # # Copyright 2018 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. # """A tool for running puffin tests in a corpus of deflate compressed files.""" import argparse import filecmp import logging import os import subprocess import sys import tempfile _PUFFHUFF = 'puffhuff' _PUFFDIFF = 'puffdiff' TESTS = (_PUFFHUFF, _PUFFDIFF) class Error(Exception): """Puffin general processing error.""" def ParseArguments(argv): """Parses and Validates command line arguments. Args: argv: command line arguments to parse. Returns: The arguments list. """ parser = argparse.ArgumentParser() parser.add_argument('corpus', metavar='CORPUS', help='A corpus directory containing compressed files') parser.add_argument('-d', '--disabled_tests', default=(), metavar='', nargs='*', help=('Space separated list of tests to disable. ' 'Allowed options include: ' + ', '.join(TESTS)), choices=TESTS) parser.add_argument('--cache_size', type=int, metavar='SIZE', help='The size (in bytes) of the cache for puffpatch ' 'operations.') parser.add_argument('--debug', action='store_true', help='Turns on verbosity.') # Parse command-line arguments. args = parser.parse_args(argv) if not os.path.isdir(args.corpus): raise Error('Corpus directory {} is non-existent or inaccesible' .format(args.corpus)) return args def main(argv): """The main function.""" args = ParseArguments(argv[1:]) if args.debug: logging.getLogger().setLevel(logging.DEBUG) # Construct list of appropriate files. files = list(filter(os.path.isfile, [os.path.join(args.corpus, f) for f in os.listdir(args.corpus)])) # For each file in corpus run puffhuff. if _PUFFHUFF not in args.disabled_tests: for src in files: with tempfile.NamedTemporaryFile() as tgt_file: operation = 'puffhuff' logging.debug('Running %s on %s', operation, src) cmd = ['puffin', '--operation={}'.format(operation), '--src_file={}'.format(src), '--dst_file={}'.format(tgt_file.name)] if subprocess.call(cmd) != 0: raise Error('Puffin failed to do {} command: {}' .format(operation, cmd)) if not filecmp.cmp(src, tgt_file.name): raise Error('The generated file {} is not equivalent to the original ' 'file {} after {} operation.' .format(tgt_file.name, src, operation)) if _PUFFDIFF not in args.disabled_tests: # Run puffdiff and puffpatch for each pairs of files in the corpus. for src in files: for tgt in files: with tempfile.NamedTemporaryFile() as patch, \ tempfile.NamedTemporaryFile() as new_tgt: operation = 'puffdiff' logging.debug('Running %s on %s (%d) and %s (%d)', operation, os.path.basename(src), os.stat(src).st_size, os.path.basename(tgt), os.stat(tgt).st_size) cmd = ['puffin', '--operation={}'.format(operation), '--src_file={}'.format(src), '--dst_file={}'.format(tgt), '--patch_file={}'.format(patch.name)] # Running the puffdiff operation if subprocess.call(cmd) != 0: raise Error('Puffin failed to do {} command: {}' .format(operation, cmd)) logging.debug('Patch size is: %d', os.stat(patch.name).st_size) operation = 'puffpatch' logging.debug('Running %s on src file %s and patch %s', operation, os.path.basename(src), patch.name) cmd = ['puffin', '--operation={}'.format(operation), '--src_file={}'.format(src), '--dst_file={}'.format(new_tgt.name), '--patch_file={}'.format(patch.name)] if args.cache_size: cmd += ['--cache_size={}'.format(args.cache_size)] # Running the puffpatch operation if subprocess.call(cmd) != 0: raise Error('Puffin failed to do {} command: {}' .format(operation, cmd)) if not filecmp.cmp(tgt, new_tgt.name): raise Error('The generated file {} is not equivalent to the ' 'original file {} after puffpatch operation.' .format(new_tgt.name, tgt)) return 0 if __name__ == '__main__': sys.exit(main(sys.argv))