1 2import yaml, canonical 3import pprint 4 5def _convert_structure(loader): 6 if loader.check_event(yaml.ScalarEvent): 7 event = loader.get_event() 8 if event.tag or event.anchor or event.value: 9 return True 10 else: 11 return None 12 elif loader.check_event(yaml.SequenceStartEvent): 13 loader.get_event() 14 sequence = [] 15 while not loader.check_event(yaml.SequenceEndEvent): 16 sequence.append(_convert_structure(loader)) 17 loader.get_event() 18 return sequence 19 elif loader.check_event(yaml.MappingStartEvent): 20 loader.get_event() 21 mapping = [] 22 while not loader.check_event(yaml.MappingEndEvent): 23 key = _convert_structure(loader) 24 value = _convert_structure(loader) 25 mapping.append((key, value)) 26 loader.get_event() 27 return mapping 28 elif loader.check_event(yaml.AliasEvent): 29 loader.get_event() 30 return '*' 31 else: 32 loader.get_event() 33 return '?' 34 35def test_structure(data_filename, structure_filename, verbose=False): 36 nodes1 = [] 37 with open(structure_filename, 'r') as file: 38 nodes2 = eval(file.read()) 39 try: 40 with open(data_filename, 'rb') as file: 41 loader = yaml.Loader(file) 42 while loader.check_event(): 43 if loader.check_event( 44 yaml.StreamStartEvent, yaml.StreamEndEvent, 45 yaml.DocumentStartEvent, yaml.DocumentEndEvent 46 ): 47 loader.get_event() 48 continue 49 nodes1.append(_convert_structure(loader)) 50 if len(nodes1) == 1: 51 nodes1 = nodes1[0] 52 assert nodes1 == nodes2, (nodes1, nodes2) 53 finally: 54 if verbose: 55 print("NODES1:") 56 pprint.pprint(nodes1) 57 print("NODES2:") 58 pprint.pprint(nodes2) 59 60test_structure.unittest = ['.data', '.structure'] 61 62def _compare_events(events1, events2, full=False): 63 assert len(events1) == len(events2), (len(events1), len(events2)) 64 for event1, event2 in zip(events1, events2): 65 assert event1.__class__ == event2.__class__, (event1, event2) 66 if isinstance(event1, yaml.AliasEvent) and full: 67 assert event1.anchor == event2.anchor, (event1, event2) 68 if isinstance(event1, (yaml.ScalarEvent, yaml.CollectionStartEvent)): 69 if (event1.tag not in [None, '!'] and event2.tag not in [None, '!']) or full: 70 assert event1.tag == event2.tag, (event1, event2) 71 if isinstance(event1, yaml.ScalarEvent): 72 assert event1.value == event2.value, (event1, event2) 73 74def test_parser(data_filename, canonical_filename, verbose=False): 75 events1 = None 76 events2 = None 77 try: 78 with open(data_filename, 'rb') as file: 79 events1 = list(yaml.parse(file)) 80 with open(canonical_filename, 'rb') as file: 81 events2 = list(yaml.canonical_parse(file)) 82 _compare_events(events1, events2) 83 finally: 84 if verbose: 85 print("EVENTS1:") 86 pprint.pprint(events1) 87 print("EVENTS2:") 88 pprint.pprint(events2) 89 90test_parser.unittest = ['.data', '.canonical'] 91 92def test_parser_on_canonical(canonical_filename, verbose=False): 93 events1 = None 94 events2 = None 95 try: 96 with open(canonical_filename, 'rb') as file: 97 events1 = list(yaml.parse(file)) 98 with open(canonical_filename, 'rb') as file: 99 events2 = list(yaml.canonical_parse(file)) 100 _compare_events(events1, events2, full=True) 101 finally: 102 if verbose: 103 print("EVENTS1:") 104 pprint.pprint(events1) 105 print("EVENTS2:") 106 pprint.pprint(events2) 107 108test_parser_on_canonical.unittest = ['.canonical'] 109 110def _compare_nodes(node1, node2): 111 assert node1.__class__ == node2.__class__, (node1, node2) 112 assert node1.tag == node2.tag, (node1, node2) 113 if isinstance(node1, yaml.ScalarNode): 114 assert node1.value == node2.value, (node1, node2) 115 else: 116 assert len(node1.value) == len(node2.value), (node1, node2) 117 for item1, item2 in zip(node1.value, node2.value): 118 if not isinstance(item1, tuple): 119 item1 = (item1,) 120 item2 = (item2,) 121 for subnode1, subnode2 in zip(item1, item2): 122 _compare_nodes(subnode1, subnode2) 123 124def test_composer(data_filename, canonical_filename, verbose=False): 125 nodes1 = None 126 nodes2 = None 127 try: 128 with open(data_filename, 'rb') as file: 129 nodes1 = list(yaml.compose_all(file)) 130 with open(canonical_filename, 'rb') as file: 131 nodes2 = list(yaml.canonical_compose_all(file)) 132 assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2)) 133 for node1, node2 in zip(nodes1, nodes2): 134 _compare_nodes(node1, node2) 135 finally: 136 if verbose: 137 print("NODES1:") 138 pprint.pprint(nodes1) 139 print("NODES2:") 140 pprint.pprint(nodes2) 141 142test_composer.unittest = ['.data', '.canonical'] 143 144def _make_loader(): 145 global MyLoader 146 147 class MyLoader(yaml.Loader): 148 def construct_sequence(self, node): 149 return tuple(yaml.Loader.construct_sequence(self, node)) 150 def construct_mapping(self, node): 151 pairs = self.construct_pairs(node) 152 pairs.sort(key=(lambda i: str(i))) 153 return pairs 154 def construct_undefined(self, node): 155 return self.construct_scalar(node) 156 157 MyLoader.add_constructor('tag:yaml.org,2002:map', MyLoader.construct_mapping) 158 MyLoader.add_constructor(None, MyLoader.construct_undefined) 159 160def _make_canonical_loader(): 161 global MyCanonicalLoader 162 163 class MyCanonicalLoader(yaml.CanonicalLoader): 164 def construct_sequence(self, node): 165 return tuple(yaml.CanonicalLoader.construct_sequence(self, node)) 166 def construct_mapping(self, node): 167 pairs = self.construct_pairs(node) 168 pairs.sort(key=(lambda i: str(i))) 169 return pairs 170 def construct_undefined(self, node): 171 return self.construct_scalar(node) 172 173 MyCanonicalLoader.add_constructor('tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping) 174 MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined) 175 176def test_constructor(data_filename, canonical_filename, verbose=False): 177 _make_loader() 178 _make_canonical_loader() 179 native1 = None 180 native2 = None 181 try: 182 with open(data_filename, 'rb') as file: 183 native1 = list(yaml.load_all(file, Loader=MyLoader)) 184 with open(canonical_filename, 'rb') as file: 185 native2 = list(yaml.load_all(file, Loader=MyCanonicalLoader)) 186 assert native1 == native2, (native1, native2) 187 finally: 188 if verbose: 189 print("NATIVE1:") 190 pprint.pprint(native1) 191 print("NATIVE2:") 192 pprint.pprint(native2) 193 194test_constructor.unittest = ['.data', '.canonical'] 195 196if __name__ == '__main__': 197 import test_appliance 198 test_appliance.run(globals()) 199 200