1 2import sys, os, os.path, types, traceback, pprint 3 4DATA = 'tests/data' 5 6def find_test_functions(collections): 7 if not isinstance(collections, list): 8 collections = [collections] 9 functions = [] 10 for collection in collections: 11 if not isinstance(collection, dict): 12 collection = vars(collection) 13 for key in sorted(collection): 14 value = collection[key] 15 if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): 16 functions.append(value) 17 return functions 18 19def find_test_filenames(directory): 20 filenames = {} 21 for filename in os.listdir(directory): 22 if os.path.isfile(os.path.join(directory, filename)): 23 base, ext = os.path.splitext(filename) 24 if base.endswith('-py2'): 25 continue 26 filenames.setdefault(base, []).append(ext) 27 filenames = sorted(filenames.items()) 28 return filenames 29 30def parse_arguments(args): 31 if args is None: 32 args = sys.argv[1:] 33 verbose = False 34 if '-v' in args: 35 verbose = True 36 args.remove('-v') 37 if '--verbose' in args: 38 verbose = True 39 args.remove('--verbose') 40 if 'YAML_TEST_VERBOSE' in os.environ: 41 verbose = True 42 include_functions = [] 43 if args: 44 include_functions.append(args.pop(0)) 45 if 'YAML_TEST_FUNCTIONS' in os.environ: 46 include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split()) 47 include_filenames = [] 48 include_filenames.extend(args) 49 if 'YAML_TEST_FILENAMES' in os.environ: 50 include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split()) 51 return include_functions, include_filenames, verbose 52 53def execute(function, filenames, verbose): 54 name = function.__name__ 55 if verbose: 56 sys.stdout.write('='*75+'\n') 57 sys.stdout.write('%s(%s)...\n' % (name, ', '.join(filenames))) 58 try: 59 function(verbose=verbose, *filenames) 60 except Exception as exc: 61 info = sys.exc_info() 62 if isinstance(exc, AssertionError): 63 kind = 'FAILURE' 64 else: 65 kind = 'ERROR' 66 if verbose: 67 traceback.print_exc(limit=1, file=sys.stdout) 68 else: 69 sys.stdout.write(kind[0]) 70 sys.stdout.flush() 71 else: 72 kind = 'SUCCESS' 73 info = None 74 if not verbose: 75 sys.stdout.write('.') 76 sys.stdout.flush() 77 return (name, filenames, kind, info) 78 79def display(results, verbose): 80 if results and not verbose: 81 sys.stdout.write('\n') 82 total = len(results) 83 failures = 0 84 errors = 0 85 for name, filenames, kind, info in results: 86 if kind == 'SUCCESS': 87 continue 88 if kind == 'FAILURE': 89 failures += 1 90 if kind == 'ERROR': 91 errors += 1 92 sys.stdout.write('='*75+'\n') 93 sys.stdout.write('%s(%s): %s\n' % (name, ', '.join(filenames), kind)) 94 if kind == 'ERROR': 95 traceback.print_exception(file=sys.stdout, *info) 96 else: 97 sys.stdout.write('Traceback (most recent call last):\n') 98 traceback.print_tb(info[2], file=sys.stdout) 99 sys.stdout.write('%s: see below\n' % info[0].__name__) 100 sys.stdout.write('~'*75+'\n') 101 for arg in info[1].args: 102 pprint.pprint(arg, stream=sys.stdout) 103 for filename in filenames: 104 sys.stdout.write('-'*75+'\n') 105 sys.stdout.write('%s:\n' % filename) 106 with open(filename, 'r', errors='replace') as file: 107 data = file.read() 108 sys.stdout.write(data) 109 if data and data[-1] != '\n': 110 sys.stdout.write('\n') 111 sys.stdout.write('='*75+'\n') 112 sys.stdout.write('TESTS: %s\n' % total) 113 if failures: 114 sys.stdout.write('FAILURES: %s\n' % failures) 115 if errors: 116 sys.stdout.write('ERRORS: %s\n' % errors) 117 return not (failures or errors) 118 119def run(collections, args=None): 120 test_functions = find_test_functions(collections) 121 test_filenames = find_test_filenames(DATA) 122 include_functions, include_filenames, verbose = parse_arguments(args) 123 results = [] 124 for function in test_functions: 125 if include_functions and function.__name__ not in include_functions: 126 continue 127 if function.unittest and function.unittest is not True: 128 for base, exts in test_filenames: 129 if include_filenames and base not in include_filenames: 130 continue 131 filenames = [] 132 for ext in function.unittest: 133 if ext not in exts: 134 break 135 filenames.append(os.path.join(DATA, base+ext)) 136 else: 137 skip_exts = getattr(function, 'skip', []) 138 for skip_ext in skip_exts: 139 if skip_ext in exts: 140 break 141 else: 142 result = execute(function, filenames, verbose) 143 results.append(result) 144 else: 145 result = execute(function, [], verbose) 146 results.append(result) 147 return display(results, verbose=verbose) 148 149