1#!/usr/bin/env python3 2# ############################################################################# 3# Copyright (c) 2018-present lzutao <taolzu(at)gmail.com> 4# All rights reserved. 5# 6# This source code is licensed under both the BSD-style license (found in the 7# LICENSE file in the root directory of this source tree) and the GPLv2 (found 8# in the COPYING file in the root directory of this source tree). 9# ############################################################################# 10import os 11import subprocess 12import tempfile 13 14 15def valgrindTest(valgrind, datagen, fuzzer, zstd, fullbench): 16 VALGRIND_ARGS = [valgrind, '--leak-check=full', '--show-leak-kinds=all', '--error-exitcode=1'] 17 18 print('\n ---- valgrind tests : memory analyzer ----') 19 20 subprocess.check_call([*VALGRIND_ARGS, datagen, '-g50M'], stdout=subprocess.DEVNULL) 21 22 if subprocess.call([*VALGRIND_ARGS, zstd], 23 stdout=subprocess.DEVNULL) == 0: 24 raise subprocess.CalledProcessError('zstd without argument should have failed') 25 26 with subprocess.Popen([datagen, '-g80'], stdout=subprocess.PIPE) as p1, \ 27 subprocess.Popen([*VALGRIND_ARGS, zstd, '-', '-c'], 28 stdin=p1.stdout, 29 stdout=subprocess.DEVNULL) as p2: 30 p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. 31 p2.communicate() 32 if p2.returncode != 0: 33 raise subprocess.CalledProcessError() 34 35 with subprocess.Popen([datagen, '-g16KB'], stdout=subprocess.PIPE) as p1, \ 36 subprocess.Popen([*VALGRIND_ARGS, zstd, '-vf', '-', '-c'], 37 stdin=p1.stdout, 38 stdout=subprocess.DEVNULL) as p2: 39 p1.stdout.close() 40 p2.communicate() 41 if p2.returncode != 0: 42 raise subprocess.CalledProcessError() 43 44 with tempfile.NamedTemporaryFile() as tmp_fd: 45 with subprocess.Popen([datagen, '-g2930KB'], stdout=subprocess.PIPE) as p1, \ 46 subprocess.Popen([*VALGRIND_ARGS, zstd, '-5', '-vf', '-', '-o', tmp_fd.name], 47 stdin=p1.stdout) as p2: 48 p1.stdout.close() 49 p2.communicate() 50 if p2.returncode != 0: 51 raise subprocess.CalledProcessError() 52 53 subprocess.check_call([*VALGRIND_ARGS, zstd, '-vdf', tmp_fd.name, '-c'], 54 stdout=subprocess.DEVNULL) 55 56 with subprocess.Popen([datagen, '-g64MB'], stdout=subprocess.PIPE) as p1, \ 57 subprocess.Popen([*VALGRIND_ARGS, zstd, '-vf', '-', '-c'], 58 stdin=p1.stdout, 59 stdout=subprocess.DEVNULL) as p2: 60 p1.stdout.close() 61 p2.communicate() 62 if p2.returncode != 0: 63 raise subprocess.CalledProcessError() 64 65 subprocess.check_call([*VALGRIND_ARGS, fuzzer, '-T1mn', '-t1']) 66 subprocess.check_call([*VALGRIND_ARGS, fullbench, '-i1']) 67 68 69def main(): 70 import argparse 71 parser = argparse.ArgumentParser(description='Valgrind tests : memory analyzer') 72 parser.add_argument('valgrind', help='valgrind path') 73 parser.add_argument('zstd', help='zstd path') 74 parser.add_argument('datagen', help='datagen path') 75 parser.add_argument('fuzzer', help='fuzzer path') 76 parser.add_argument('fullbench', help='fullbench path') 77 78 args = parser.parse_args() 79 80 valgrind = args.valgrind 81 zstd = args.zstd 82 datagen = args.datagen 83 fuzzer = args.fuzzer 84 fullbench = args.fullbench 85 86 valgrindTest(valgrind, datagen, fuzzer, zstd, fullbench) 87 88 89if __name__ == '__main__': 90 main() 91