• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1
2import yaml._yaml, yaml
3import types, pprint, tempfile, sys, os
4
5yaml.PyBaseLoader = yaml.BaseLoader
6yaml.PySafeLoader = yaml.SafeLoader
7yaml.PyLoader = yaml.Loader
8yaml.PyBaseDumper = yaml.BaseDumper
9yaml.PySafeDumper = yaml.SafeDumper
10yaml.PyDumper = yaml.Dumper
11
12old_scan = yaml.scan
13def new_scan(stream, Loader=yaml.CLoader):
14    return old_scan(stream, Loader)
15
16old_parse = yaml.parse
17def new_parse(stream, Loader=yaml.CLoader):
18    return old_parse(stream, Loader)
19
20old_compose = yaml.compose
21def new_compose(stream, Loader=yaml.CLoader):
22    return old_compose(stream, Loader)
23
24old_compose_all = yaml.compose_all
25def new_compose_all(stream, Loader=yaml.CLoader):
26    return old_compose_all(stream, Loader)
27
28old_load = yaml.load
29def new_load(stream, Loader=yaml.CLoader):
30    return old_load(stream, Loader)
31
32old_load_all = yaml.load_all
33def new_load_all(stream, Loader=yaml.CLoader):
34    return old_load_all(stream, Loader)
35
36old_safe_load = yaml.safe_load
37def new_safe_load(stream):
38    return old_load(stream, yaml.CSafeLoader)
39
40old_safe_load_all = yaml.safe_load_all
41def new_safe_load_all(stream):
42    return old_load_all(stream, yaml.CSafeLoader)
43
44old_emit = yaml.emit
45def new_emit(events, stream=None, Dumper=yaml.CDumper, **kwds):
46    return old_emit(events, stream, Dumper, **kwds)
47
48old_serialize = yaml.serialize
49def new_serialize(node, stream, Dumper=yaml.CDumper, **kwds):
50    return old_serialize(node, stream, Dumper, **kwds)
51
52old_serialize_all = yaml.serialize_all
53def new_serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds):
54    return old_serialize_all(nodes, stream, Dumper, **kwds)
55
56old_dump = yaml.dump
57def new_dump(data, stream=None, Dumper=yaml.CDumper, **kwds):
58    return old_dump(data, stream, Dumper, **kwds)
59
60old_dump_all = yaml.dump_all
61def new_dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds):
62    return old_dump_all(documents, stream, Dumper, **kwds)
63
64old_safe_dump = yaml.safe_dump
65def new_safe_dump(data, stream=None, **kwds):
66    return old_dump(data, stream, yaml.CSafeDumper, **kwds)
67
68old_safe_dump_all = yaml.safe_dump_all
69def new_safe_dump_all(documents, stream=None, **kwds):
70    return old_dump_all(documents, stream, yaml.CSafeDumper, **kwds)
71
72def _set_up():
73    yaml.BaseLoader = yaml.CBaseLoader
74    yaml.SafeLoader = yaml.CSafeLoader
75    yaml.Loader = yaml.CLoader
76    yaml.BaseDumper = yaml.CBaseDumper
77    yaml.SafeDumper = yaml.CSafeDumper
78    yaml.Dumper = yaml.CDumper
79    yaml.scan = new_scan
80    yaml.parse = new_parse
81    yaml.compose = new_compose
82    yaml.compose_all = new_compose_all
83    yaml.load = new_load
84    yaml.load_all = new_load_all
85    yaml.safe_load = new_safe_load
86    yaml.safe_load_all = new_safe_load_all
87    yaml.emit = new_emit
88    yaml.serialize = new_serialize
89    yaml.serialize_all = new_serialize_all
90    yaml.dump = new_dump
91    yaml.dump_all = new_dump_all
92    yaml.safe_dump = new_safe_dump
93    yaml.safe_dump_all = new_safe_dump_all
94
95def _tear_down():
96    yaml.BaseLoader = yaml.PyBaseLoader
97    yaml.SafeLoader = yaml.PySafeLoader
98    yaml.Loader = yaml.PyLoader
99    yaml.BaseDumper = yaml.PyBaseDumper
100    yaml.SafeDumper = yaml.PySafeDumper
101    yaml.Dumper = yaml.PyDumper
102    yaml.scan = old_scan
103    yaml.parse = old_parse
104    yaml.compose = old_compose
105    yaml.compose_all = old_compose_all
106    yaml.load = old_load
107    yaml.load_all = old_load_all
108    yaml.safe_load = old_safe_load
109    yaml.safe_load_all = old_safe_load_all
110    yaml.emit = old_emit
111    yaml.serialize = old_serialize
112    yaml.serialize_all = old_serialize_all
113    yaml.dump = old_dump
114    yaml.dump_all = old_dump_all
115    yaml.safe_dump = old_safe_dump
116    yaml.safe_dump_all = old_safe_dump_all
117
118def test_c_version(verbose=False):
119    if verbose:
120        print(_yaml.get_version())
121        print(_yaml.get_version_string())
122    assert ("%s.%s.%s" % yaml._yaml.get_version()) == yaml._yaml.get_version_string(),    \
123            (_yaml.get_version(), yaml._yaml.get_version_string())
124
125def test_deprecate_yaml_module():
126    import _yaml
127    assert _yaml.__package__ == ''
128    assert isinstance(_yaml.get_version(), str)
129
130def _compare_scanners(py_data, c_data, verbose):
131    py_tokens = list(yaml.scan(py_data, Loader=yaml.PyLoader))
132    c_tokens = []
133    try:
134        for token in yaml.scan(c_data, Loader=yaml.CLoader):
135            c_tokens.append(token)
136        assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens))
137        for py_token, c_token in zip(py_tokens, c_tokens):
138            assert py_token.__class__ == c_token.__class__, (py_token, c_token)
139            if hasattr(py_token, 'value'):
140                assert py_token.value == c_token.value, (py_token, c_token)
141            if isinstance(py_token, yaml.StreamEndToken):
142                continue
143            py_start = (py_token.start_mark.index, py_token.start_mark.line, py_token.start_mark.column)
144            py_end = (py_token.end_mark.index, py_token.end_mark.line, py_token.end_mark.column)
145            c_start = (c_token.start_mark.index, c_token.start_mark.line, c_token.start_mark.column)
146            c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column)
147            assert py_start == c_start, (py_start, c_start)
148            assert py_end == c_end, (py_end, c_end)
149    finally:
150        if verbose:
151            print("PY_TOKENS:")
152            pprint.pprint(py_tokens)
153            print("C_TOKENS:")
154            pprint.pprint(c_tokens)
155
156def test_c_scanner(data_filename, canonical_filename, verbose=False):
157    with open(data_filename, 'rb') as file1, open(data_filename, 'rb') as file2:
158        _compare_scanners(file1, file2, verbose)
159    with open(data_filename, 'rb') as file1, open(data_filename, 'rb') as file2:
160        _compare_scanners(file1.read(), file2.read(), verbose)
161    with open(canonical_filename, 'rb') as file1, open(canonical_filename, 'rb') as file2:
162        _compare_scanners(file1, file2, verbose)
163    with open(canonical_filename, 'rb') as file1, open(canonical_filename, 'rb') as file2:
164        _compare_scanners(file1.read(), file2.read(), verbose)
165
166test_c_scanner.unittest = ['.data', '.canonical']
167test_c_scanner.skip = ['.skip-ext']
168
169def _compare_parsers(py_data, c_data, verbose):
170    py_events = list(yaml.parse(py_data, Loader=yaml.PyLoader))
171    c_events = []
172    try:
173        for event in yaml.parse(c_data, Loader=yaml.CLoader):
174            c_events.append(event)
175        assert len(py_events) == len(c_events), (len(py_events), len(c_events))
176        for py_event, c_event in zip(py_events, c_events):
177            for attribute in ['__class__', 'anchor', 'tag', 'implicit',
178                                'value', 'explicit', 'version', 'tags']:
179                py_value = getattr(py_event, attribute, None)
180                c_value = getattr(c_event, attribute, None)
181                assert py_value == c_value, (py_event, c_event, attribute)
182    finally:
183        if verbose:
184            print("PY_EVENTS:")
185            pprint.pprint(py_events)
186            print("C_EVENTS:")
187            pprint.pprint(c_events)
188
189def test_c_parser(data_filename, canonical_filename, verbose=False):
190    with open(data_filename, 'rb') as file1, open(data_filename, 'rb') as file2:
191        _compare_parsers(file1, file2, verbose)
192    with open(data_filename, 'rb') as file1, open(data_filename, 'rb') as file2:
193        _compare_parsers(file1.read(), file2.read(), verbose)
194    with open(canonical_filename, 'rb') as file1, open(canonical_filename, 'rb') as file2:
195        _compare_parsers(file1, file2, verbose)
196    with open(canonical_filename, 'rb') as file1, open(canonical_filename, 'rb') as file2:
197        _compare_parsers(file1.read(), file2.read(), verbose)
198
199test_c_parser.unittest = ['.data', '.canonical']
200test_c_parser.skip = ['.skip-ext']
201
202def _compare_emitters(data, verbose):
203    events = list(yaml.parse(data, Loader=yaml.PyLoader))
204    c_data = yaml.emit(events, Dumper=yaml.CDumper)
205    if verbose:
206        print(c_data)
207    py_events = list(yaml.parse(c_data, Loader=yaml.PyLoader))
208    c_events = list(yaml.parse(c_data, Loader=yaml.CLoader))
209    try:
210        assert len(events) == len(py_events), (len(events), len(py_events))
211        assert len(events) == len(c_events), (len(events), len(c_events))
212        for event, py_event, c_event in zip(events, py_events, c_events):
213            for attribute in ['__class__', 'anchor', 'tag', 'implicit',
214                                'value', 'explicit', 'version', 'tags']:
215                value = getattr(event, attribute, None)
216                py_value = getattr(py_event, attribute, None)
217                c_value = getattr(c_event, attribute, None)
218                if attribute == 'tag' and value in [None, '!'] \
219                        and py_value in [None, '!'] and c_value in [None, '!']:
220                    continue
221                if attribute == 'explicit' and (py_value or c_value):
222                    continue
223                assert value == py_value, (event, py_event, attribute)
224                assert value == c_value, (event, c_event, attribute)
225    finally:
226        if verbose:
227            print("EVENTS:")
228            pprint.pprint(events)
229            print("PY_EVENTS:")
230            pprint.pprint(py_events)
231            print("C_EVENTS:")
232            pprint.pprint(c_events)
233
234def test_c_emitter(data_filename, canonical_filename, verbose=False):
235    with open(data_filename, 'rb') as file:
236        _compare_emitters(file.read(), verbose)
237    with open(canonical_filename, 'rb') as file:
238        _compare_emitters(file.read(), verbose)
239
240test_c_emitter.unittest = ['.data', '.canonical']
241test_c_emitter.skip = ['.skip-ext']
242
243def test_large_file(verbose=False):
244    SIZE_LINE = 24
245    SIZE_ITERATION = 0
246    SIZE_FILE = 31
247    if sys.maxsize <= 2**32:
248        return
249    if os.environ.get('PYYAML_TEST_GROUP', '') != 'all':
250        return
251    with tempfile.TemporaryFile() as temp_file:
252        for i in range(2**(SIZE_FILE-SIZE_ITERATION-SIZE_LINE) + 1):
253            temp_file.write(bytes(('-' + (' ' * (2**SIZE_LINE-4))+ '{}\n')*(2**SIZE_ITERATION), 'utf-8'))
254        temp_file.seek(0)
255        yaml.load(temp_file, Loader=yaml.CLoader)
256
257test_large_file.unittest = None
258
259def wrap_ext_function(function):
260    def wrapper(*args, **kwds):
261        _set_up()
262        try:
263            function(*args, **kwds)
264        finally:
265            _tear_down()
266    wrapper.__name__ = '%s_ext' % function.__name__
267    wrapper.unittest = function.unittest
268    wrapper.skip = getattr(function, 'skip', [])+['.skip-ext']
269    return wrapper
270
271def wrap_ext(collections):
272    functions = []
273    if not isinstance(collections, list):
274        collections = [collections]
275    for collection in collections:
276        if not isinstance(collection, dict):
277            collection = vars(collection)
278        for key in sorted(collection):
279            value = collection[key]
280            if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
281                functions.append(wrap_ext_function(value))
282    for function in functions:
283        assert function.__name__ not in globals()
284        globals()[function.__name__] = function
285
286import test_tokens, test_structure, test_errors, test_resolver, test_constructor,   \
287        test_emitter, test_representer, test_recursive, test_input_output
288wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor,
289        test_emitter, test_representer, test_recursive, test_input_output])
290
291if __name__ == '__main__':
292    import test_appliance
293    test_appliance.run(globals())
294
295