• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1
2import yaml._yaml, yaml
3import types, pprint, tempfile, sys, os
4
5yaml.PyBaseLoader = yaml.BaseLoader
6yaml.PySafeLoader = yaml.SafeLoader
7yaml.PyLoader = yaml.Loader
8yaml.PyBaseDumper = yaml.BaseDumper
9yaml.PySafeDumper = yaml.SafeDumper
10yaml.PyDumper = yaml.Dumper
11
12old_scan = yaml.scan
13def new_scan(stream, Loader=yaml.CLoader):
14    return old_scan(stream, Loader)
15
16old_parse = yaml.parse
17def new_parse(stream, Loader=yaml.CLoader):
18    return old_parse(stream, Loader)
19
20old_compose = yaml.compose
21def new_compose(stream, Loader=yaml.CLoader):
22    return old_compose(stream, Loader)
23
24old_compose_all = yaml.compose_all
25def new_compose_all(stream, Loader=yaml.CLoader):
26    return old_compose_all(stream, Loader)
27
28old_load = yaml.load
29def new_load(stream, Loader=yaml.CLoader):
30    return old_load(stream, Loader)
31
32old_load_all = yaml.load_all
33def new_load_all(stream, Loader=yaml.CLoader):
34    return old_load_all(stream, Loader)
35
36old_safe_load = yaml.safe_load
37def new_safe_load(stream):
38    return old_load(stream, yaml.CSafeLoader)
39
40old_safe_load_all = yaml.safe_load_all
41def new_safe_load_all(stream):
42    return old_load_all(stream, yaml.CSafeLoader)
43
44old_emit = yaml.emit
45def new_emit(events, stream=None, Dumper=yaml.CDumper, **kwds):
46    return old_emit(events, stream, Dumper, **kwds)
47
48old_serialize = yaml.serialize
49def new_serialize(node, stream, Dumper=yaml.CDumper, **kwds):
50    return old_serialize(node, stream, Dumper, **kwds)
51
52old_serialize_all = yaml.serialize_all
53def new_serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds):
54    return old_serialize_all(nodes, stream, Dumper, **kwds)
55
56old_dump = yaml.dump
57def new_dump(data, stream=None, Dumper=yaml.CDumper, **kwds):
58    return old_dump(data, stream, Dumper, **kwds)
59
60old_dump_all = yaml.dump_all
61def new_dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds):
62    return old_dump_all(documents, stream, Dumper, **kwds)
63
64old_safe_dump = yaml.safe_dump
65def new_safe_dump(data, stream=None, **kwds):
66    return old_dump(data, stream, yaml.CSafeDumper, **kwds)
67
68old_safe_dump_all = yaml.safe_dump_all
69def new_safe_dump_all(documents, stream=None, **kwds):
70    return old_dump_all(documents, stream, yaml.CSafeDumper, **kwds)
71
72def _set_up():
73    yaml.BaseLoader = yaml.CBaseLoader
74    yaml.SafeLoader = yaml.CSafeLoader
75    yaml.Loader = yaml.CLoader
76    yaml.BaseDumper = yaml.CBaseDumper
77    yaml.SafeDumper = yaml.CSafeDumper
78    yaml.Dumper = yaml.CDumper
79    yaml.scan = new_scan
80    yaml.parse = new_parse
81    yaml.compose = new_compose
82    yaml.compose_all = new_compose_all
83    yaml.load = new_load
84    yaml.load_all = new_load_all
85    yaml.safe_load = new_safe_load
86    yaml.safe_load_all = new_safe_load_all
87    yaml.emit = new_emit
88    yaml.serialize = new_serialize
89    yaml.serialize_all = new_serialize_all
90    yaml.dump = new_dump
91    yaml.dump_all = new_dump_all
92    yaml.safe_dump = new_safe_dump
93    yaml.safe_dump_all = new_safe_dump_all
94
95def _tear_down():
96    yaml.BaseLoader = yaml.PyBaseLoader
97    yaml.SafeLoader = yaml.PySafeLoader
98    yaml.Loader = yaml.PyLoader
99    yaml.BaseDumper = yaml.PyBaseDumper
100    yaml.SafeDumper = yaml.PySafeDumper
101    yaml.Dumper = yaml.PyDumper
102    yaml.scan = old_scan
103    yaml.parse = old_parse
104    yaml.compose = old_compose
105    yaml.compose_all = old_compose_all
106    yaml.load = old_load
107    yaml.load_all = old_load_all
108    yaml.safe_load = old_safe_load
109    yaml.safe_load_all = old_safe_load_all
110    yaml.emit = old_emit
111    yaml.serialize = old_serialize
112    yaml.serialize_all = old_serialize_all
113    yaml.dump = old_dump
114    yaml.dump_all = old_dump_all
115    yaml.safe_dump = old_safe_dump
116    yaml.safe_dump_all = old_safe_dump_all
117
118def test_c_version(verbose=False):
119    if verbose:
120        print yaml._yaml.get_version()
121        print yaml._yaml.get_version_string()
122    assert ("%s.%s.%s" % yaml._yaml.get_version()) == yaml._yaml.get_version_string(),    \
123            (_yaml.get_version(), yaml._yaml.get_version_string())
124
125def test_deprecate_yaml_module():
126    import _yaml
127    assert _yaml.__package__ == ''
128    assert isinstance(_yaml.get_version(), str)
129
130def _compare_scanners(py_data, c_data, verbose):
131    py_tokens = list(yaml.scan(py_data, Loader=yaml.PyLoader))
132    c_tokens = []
133    try:
134        for token in yaml.scan(c_data, Loader=yaml.CLoader):
135            c_tokens.append(token)
136        assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens))
137        for py_token, c_token in zip(py_tokens, c_tokens):
138            assert py_token.__class__ == c_token.__class__, (py_token, c_token)
139            if hasattr(py_token, 'value'):
140                assert py_token.value == c_token.value, (py_token, c_token)
141            if isinstance(py_token, yaml.StreamEndToken):
142                continue
143            py_start = (py_token.start_mark.index, py_token.start_mark.line, py_token.start_mark.column)
144            py_end = (py_token.end_mark.index, py_token.end_mark.line, py_token.end_mark.column)
145            c_start = (c_token.start_mark.index, c_token.start_mark.line, c_token.start_mark.column)
146            c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column)
147            assert py_start == c_start, (py_start, c_start)
148            assert py_end == c_end, (py_end, c_end)
149    finally:
150        if verbose:
151            print "PY_TOKENS:"
152            pprint.pprint(py_tokens)
153            print "C_TOKENS:"
154            pprint.pprint(c_tokens)
155
156def test_c_scanner(data_filename, canonical_filename, verbose=False):
157    _compare_scanners(open(data_filename, 'rb'),
158            open(data_filename, 'rb'), verbose)
159    _compare_scanners(open(data_filename, 'rb').read(),
160            open(data_filename, 'rb').read(), verbose)
161    _compare_scanners(open(canonical_filename, 'rb'),
162            open(canonical_filename, 'rb'), verbose)
163    _compare_scanners(open(canonical_filename, 'rb').read(),
164            open(canonical_filename, 'rb').read(), verbose)
165
166test_c_scanner.unittest = ['.data', '.canonical']
167test_c_scanner.skip = ['.skip-ext']
168
169def _compare_parsers(py_data, c_data, verbose):
170    py_events = list(yaml.parse(py_data, Loader=yaml.PyLoader))
171    c_events = []
172    try:
173        for event in yaml.parse(c_data, Loader=yaml.CLoader):
174            c_events.append(event)
175        assert len(py_events) == len(c_events), (len(py_events), len(c_events))
176        for py_event, c_event in zip(py_events, c_events):
177            for attribute in ['__class__', 'anchor', 'tag', 'implicit',
178                                'value', 'explicit', 'version', 'tags']:
179                py_value = getattr(py_event, attribute, None)
180                c_value = getattr(c_event, attribute, None)
181                assert py_value == c_value, (py_event, c_event, attribute)
182    finally:
183        if verbose:
184            print "PY_EVENTS:"
185            pprint.pprint(py_events)
186            print "C_EVENTS:"
187            pprint.pprint(c_events)
188
189def test_c_parser(data_filename, canonical_filename, verbose=False):
190    _compare_parsers(open(data_filename, 'rb'),
191            open(data_filename, 'rb'), verbose)
192    _compare_parsers(open(data_filename, 'rb').read(),
193            open(data_filename, 'rb').read(), verbose)
194    _compare_parsers(open(canonical_filename, 'rb'),
195            open(canonical_filename, 'rb'), verbose)
196    _compare_parsers(open(canonical_filename, 'rb').read(),
197            open(canonical_filename, 'rb').read(), verbose)
198
199test_c_parser.unittest = ['.data', '.canonical']
200test_c_parser.skip = ['.skip-ext']
201
202def _compare_emitters(data, verbose):
203    events = list(yaml.parse(data, Loader=yaml.PyLoader))
204    c_data = yaml.emit(events, Dumper=yaml.CDumper)
205    if verbose:
206        print c_data
207    py_events = list(yaml.parse(c_data, Loader=yaml.PyLoader))
208    c_events = list(yaml.parse(c_data, Loader=yaml.CLoader))
209    try:
210        assert len(events) == len(py_events), (len(events), len(py_events))
211        assert len(events) == len(c_events), (len(events), len(c_events))
212        for event, py_event, c_event in zip(events, py_events, c_events):
213            for attribute in ['__class__', 'anchor', 'tag', 'implicit',
214                                'value', 'explicit', 'version', 'tags']:
215                value = getattr(event, attribute, None)
216                py_value = getattr(py_event, attribute, None)
217                c_value = getattr(c_event, attribute, None)
218                if attribute == 'tag' and value in [None, u'!'] \
219                        and py_value in [None, u'!'] and c_value in [None, u'!']:
220                    continue
221                if attribute == 'explicit' and (py_value or c_value):
222                    continue
223                assert value == py_value, (event, py_event, attribute)
224                assert value == c_value, (event, c_event, attribute)
225    finally:
226        if verbose:
227            print "EVENTS:"
228            pprint.pprint(events)
229            print "PY_EVENTS:"
230            pprint.pprint(py_events)
231            print "C_EVENTS:"
232            pprint.pprint(c_events)
233
234def test_c_emitter(data_filename, canonical_filename, verbose=False):
235    _compare_emitters(open(data_filename, 'rb').read(), verbose)
236    _compare_emitters(open(canonical_filename, 'rb').read(), verbose)
237
238test_c_emitter.unittest = ['.data', '.canonical']
239test_c_emitter.skip = ['.skip-ext']
240
241def test_large_file(verbose=False):
242    SIZE_LINE = 24
243    SIZE_ITERATION = 0
244    SIZE_FILE = 31
245    if sys.maxsize <= 2**32:
246        return
247    if os.environ.get('PYYAML_TEST_GROUP', '') != 'all':
248        return
249    with tempfile.TemporaryFile() as temp_file:
250        for i in range(2**(SIZE_FILE-SIZE_ITERATION-SIZE_LINE) + 1):
251            temp_file.write(('-' + (' ' * (2**SIZE_LINE-4))+ '{}\n')*(2**SIZE_ITERATION))
252        temp_file.seek(0)
253        yaml.load(temp_file, Loader=yaml.CLoader)
254
255test_large_file.unittest = None
256
257def wrap_ext_function(function):
258    def wrapper(*args, **kwds):
259        _set_up()
260        try:
261            function(*args, **kwds)
262        finally:
263            _tear_down()
264    try:
265        wrapper.func_name = '%s_ext' % function.func_name
266    except TypeError:
267        pass
268    wrapper.unittest_name = '%s_ext' % function.func_name
269    wrapper.unittest = function.unittest
270    wrapper.skip = getattr(function, 'skip', [])+['.skip-ext']
271    return wrapper
272
273def wrap_ext(collections):
274    functions = []
275    if not isinstance(collections, list):
276        collections = [collections]
277    for collection in collections:
278        if not isinstance(collection, dict):
279            collection = vars(collection)
280        keys = collection.keys()
281        keys.sort()
282        for key in keys:
283            value = collection[key]
284            if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
285                functions.append(wrap_ext_function(value))
286    for function in functions:
287        assert function.unittest_name not in globals()
288        globals()[function.unittest_name] = function
289
290import test_tokens, test_structure, test_errors, test_resolver, test_constructor,   \
291        test_emitter, test_representer, test_recursive, test_input_output
292wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor,
293        test_emitter, test_representer, test_recursive, test_input_output])
294
295if __name__ == '__main__':
296    import test_appliance
297    test_appliance.run(globals())
298
299