• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1
2import yaml, canonical
3import pprint
4
5def _convert_structure(loader):
6    if loader.check_event(yaml.ScalarEvent):
7        event = loader.get_event()
8        if event.tag or event.anchor or event.value:
9            return True
10        else:
11            return None
12    elif loader.check_event(yaml.SequenceStartEvent):
13        loader.get_event()
14        sequence = []
15        while not loader.check_event(yaml.SequenceEndEvent):
16            sequence.append(_convert_structure(loader))
17        loader.get_event()
18        return sequence
19    elif loader.check_event(yaml.MappingStartEvent):
20        loader.get_event()
21        mapping = []
22        while not loader.check_event(yaml.MappingEndEvent):
23            key = _convert_structure(loader)
24            value = _convert_structure(loader)
25            mapping.append((key, value))
26        loader.get_event()
27        return mapping
28    elif loader.check_event(yaml.AliasEvent):
29        loader.get_event()
30        return '*'
31    else:
32        loader.get_event()
33        return '?'
34
35def test_structure(data_filename, structure_filename, verbose=False):
36    nodes1 = []
37    with open(structure_filename, 'r') as file:
38        nodes2 = eval(file.read())
39    try:
40        with open(data_filename, 'rb') as file:
41            loader = yaml.Loader(file)
42            while loader.check_event():
43                if loader.check_event(
44                    yaml.StreamStartEvent, yaml.StreamEndEvent,
45                    yaml.DocumentStartEvent, yaml.DocumentEndEvent
46                ):
47                    loader.get_event()
48                    continue
49                nodes1.append(_convert_structure(loader))
50        if len(nodes1) == 1:
51            nodes1 = nodes1[0]
52        assert nodes1 == nodes2, (nodes1, nodes2)
53    finally:
54        if verbose:
55            print("NODES1:")
56            pprint.pprint(nodes1)
57            print("NODES2:")
58            pprint.pprint(nodes2)
59
60test_structure.unittest = ['.data', '.structure']
61
62def _compare_events(events1, events2, full=False):
63    assert len(events1) == len(events2), (len(events1), len(events2))
64    for event1, event2 in zip(events1, events2):
65        assert event1.__class__ == event2.__class__, (event1, event2)
66        if isinstance(event1, yaml.AliasEvent) and full:
67            assert event1.anchor == event2.anchor, (event1, event2)
68        if isinstance(event1, (yaml.ScalarEvent, yaml.CollectionStartEvent)):
69            if (event1.tag not in [None, '!'] and event2.tag not in [None, '!']) or full:
70                assert event1.tag == event2.tag, (event1, event2)
71        if isinstance(event1, yaml.ScalarEvent):
72            assert event1.value == event2.value, (event1, event2)
73
74def test_parser(data_filename, canonical_filename, verbose=False):
75    events1 = None
76    events2 = None
77    try:
78        with open(data_filename, 'rb') as file:
79            events1 = list(yaml.parse(file))
80        with open(canonical_filename, 'rb') as file:
81            events2 = list(yaml.canonical_parse(file))
82        _compare_events(events1, events2)
83    finally:
84        if verbose:
85            print("EVENTS1:")
86            pprint.pprint(events1)
87            print("EVENTS2:")
88            pprint.pprint(events2)
89
90test_parser.unittest = ['.data', '.canonical']
91
92def test_parser_on_canonical(canonical_filename, verbose=False):
93    events1 = None
94    events2 = None
95    try:
96        with open(canonical_filename, 'rb') as file:
97            events1 = list(yaml.parse(file))
98        with open(canonical_filename, 'rb') as file:
99            events2 = list(yaml.canonical_parse(file))
100        _compare_events(events1, events2, full=True)
101    finally:
102        if verbose:
103            print("EVENTS1:")
104            pprint.pprint(events1)
105            print("EVENTS2:")
106            pprint.pprint(events2)
107
108test_parser_on_canonical.unittest = ['.canonical']
109
110def _compare_nodes(node1, node2):
111    assert node1.__class__ == node2.__class__, (node1, node2)
112    assert node1.tag == node2.tag, (node1, node2)
113    if isinstance(node1, yaml.ScalarNode):
114        assert node1.value == node2.value, (node1, node2)
115    else:
116        assert len(node1.value) == len(node2.value), (node1, node2)
117        for item1, item2 in zip(node1.value, node2.value):
118            if not isinstance(item1, tuple):
119                item1 = (item1,)
120                item2 = (item2,)
121            for subnode1, subnode2 in zip(item1, item2):
122                _compare_nodes(subnode1, subnode2)
123
124def test_composer(data_filename, canonical_filename, verbose=False):
125    nodes1 = None
126    nodes2 = None
127    try:
128        with open(data_filename, 'rb') as file:
129            nodes1 = list(yaml.compose_all(file))
130        with open(canonical_filename, 'rb') as file:
131            nodes2 = list(yaml.canonical_compose_all(file))
132        assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2))
133        for node1, node2 in zip(nodes1, nodes2):
134            _compare_nodes(node1, node2)
135    finally:
136        if verbose:
137            print("NODES1:")
138            pprint.pprint(nodes1)
139            print("NODES2:")
140            pprint.pprint(nodes2)
141
142test_composer.unittest = ['.data', '.canonical']
143
144def _make_loader():
145    global MyLoader
146
147    class MyLoader(yaml.Loader):
148        def construct_sequence(self, node):
149            return tuple(yaml.Loader.construct_sequence(self, node))
150        def construct_mapping(self, node):
151            pairs = self.construct_pairs(node)
152            pairs.sort(key=(lambda i: str(i)))
153            return pairs
154        def construct_undefined(self, node):
155            return self.construct_scalar(node)
156
157    MyLoader.add_constructor('tag:yaml.org,2002:map', MyLoader.construct_mapping)
158    MyLoader.add_constructor(None, MyLoader.construct_undefined)
159
160def _make_canonical_loader():
161    global MyCanonicalLoader
162
163    class MyCanonicalLoader(yaml.CanonicalLoader):
164        def construct_sequence(self, node):
165            return tuple(yaml.CanonicalLoader.construct_sequence(self, node))
166        def construct_mapping(self, node):
167            pairs = self.construct_pairs(node)
168            pairs.sort(key=(lambda i: str(i)))
169            return pairs
170        def construct_undefined(self, node):
171            return self.construct_scalar(node)
172
173    MyCanonicalLoader.add_constructor('tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping)
174    MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined)
175
176def test_constructor(data_filename, canonical_filename, verbose=False):
177    _make_loader()
178    _make_canonical_loader()
179    native1 = None
180    native2 = None
181    try:
182        with open(data_filename, 'rb') as file:
183            native1 = list(yaml.load_all(file, Loader=MyLoader))
184        with open(canonical_filename, 'rb') as file:
185            native2 = list(yaml.load_all(file, Loader=MyCanonicalLoader))
186        assert native1 == native2, (native1, native2)
187    finally:
188        if verbose:
189            print("NATIVE1:")
190            pprint.pprint(native1)
191            print("NATIVE2:")
192            pprint.pprint(native2)
193
194test_constructor.unittest = ['.data', '.canonical']
195
196if __name__ == '__main__':
197    import test_appliance
198    test_appliance.run(globals())
199
200