• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1
2import sys, os, os.path, types, traceback, pprint
3
4DATA = 'tests/data'
5
6def find_test_functions(collections):
7    if not isinstance(collections, list):
8        collections = [collections]
9    functions = []
10    for collection in collections:
11        if not isinstance(collection, dict):
12            collection = vars(collection)
13        for key in sorted(collection):
14            value = collection[key]
15            if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
16                functions.append(value)
17    return functions
18
19def find_test_filenames(directory):
20    filenames = {}
21    for filename in os.listdir(directory):
22        if os.path.isfile(os.path.join(directory, filename)):
23            base, ext = os.path.splitext(filename)
24            if base.endswith('-py2'):
25                continue
26            filenames.setdefault(base, []).append(ext)
27    filenames = sorted(filenames.items())
28    return filenames
29
30def parse_arguments(args):
31    if args is None:
32        args = sys.argv[1:]
33    verbose = False
34    if '-v' in args:
35        verbose = True
36        args.remove('-v')
37    if '--verbose' in args:
38        verbose = True
39        args.remove('--verbose')
40    if 'YAML_TEST_VERBOSE' in os.environ:
41        verbose = True
42    include_functions = []
43    if args:
44        include_functions.append(args.pop(0))
45    if 'YAML_TEST_FUNCTIONS' in os.environ:
46        include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split())
47    include_filenames = []
48    include_filenames.extend(args)
49    if 'YAML_TEST_FILENAMES' in os.environ:
50        include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split())
51    return include_functions, include_filenames, verbose
52
53def execute(function, filenames, verbose):
54    name = function.__name__
55    if verbose:
56        sys.stdout.write('='*75+'\n')
57        sys.stdout.write('%s(%s)...\n' % (name, ', '.join(filenames)))
58    try:
59        function(verbose=verbose, *filenames)
60    except Exception as exc:
61        info = sys.exc_info()
62        if isinstance(exc, AssertionError):
63            kind = 'FAILURE'
64        else:
65            kind = 'ERROR'
66        if verbose:
67            traceback.print_exc(limit=1, file=sys.stdout)
68        else:
69            sys.stdout.write(kind[0])
70            sys.stdout.flush()
71    else:
72        kind = 'SUCCESS'
73        info = None
74        if not verbose:
75            sys.stdout.write('.')
76    sys.stdout.flush()
77    return (name, filenames, kind, info)
78
79def display(results, verbose):
80    if results and not verbose:
81        sys.stdout.write('\n')
82    total = len(results)
83    failures = 0
84    errors = 0
85    for name, filenames, kind, info in results:
86        if kind == 'SUCCESS':
87            continue
88        if kind == 'FAILURE':
89            failures += 1
90        if kind == 'ERROR':
91            errors += 1
92        sys.stdout.write('='*75+'\n')
93        sys.stdout.write('%s(%s): %s\n' % (name, ', '.join(filenames), kind))
94        if kind == 'ERROR':
95            traceback.print_exception(file=sys.stdout, *info)
96        else:
97            sys.stdout.write('Traceback (most recent call last):\n')
98            traceback.print_tb(info[2], file=sys.stdout)
99            sys.stdout.write('%s: see below\n' % info[0].__name__)
100            sys.stdout.write('~'*75+'\n')
101            for arg in info[1].args:
102                pprint.pprint(arg, stream=sys.stdout)
103        for filename in filenames:
104            sys.stdout.write('-'*75+'\n')
105            sys.stdout.write('%s:\n' % filename)
106            with open(filename, 'r', errors='replace') as file:
107                data = file.read()
108            sys.stdout.write(data)
109            if data and data[-1] != '\n':
110                sys.stdout.write('\n')
111    sys.stdout.write('='*75+'\n')
112    sys.stdout.write('TESTS: %s\n' % total)
113    if failures:
114        sys.stdout.write('FAILURES: %s\n' % failures)
115    if errors:
116        sys.stdout.write('ERRORS: %s\n' % errors)
117    return not (failures or errors)
118
119def run(collections, args=None):
120    test_functions = find_test_functions(collections)
121    test_filenames = find_test_filenames(DATA)
122    include_functions, include_filenames, verbose = parse_arguments(args)
123    results = []
124    for function in test_functions:
125        if include_functions and function.__name__ not in include_functions:
126            continue
127        if function.unittest and function.unittest is not True:
128            for base, exts in test_filenames:
129                if include_filenames and base not in include_filenames:
130                    continue
131                filenames = []
132                for ext in function.unittest:
133                    if ext not in exts:
134                        break
135                    filenames.append(os.path.join(DATA, base+ext))
136                else:
137                    skip_exts = getattr(function, 'skip', [])
138                    for skip_ext in skip_exts:
139                        if skip_ext in exts:
140                            break
141                    else:
142                        result = execute(function, filenames, verbose)
143                        results.append(result)
144        else:
145            result = execute(function, [], verbose)
146            results.append(result)
147    return display(results, verbose=verbose)
148
149