1 2import yaml._yaml, yaml 3import types, pprint, tempfile, sys, os 4 5yaml.PyBaseLoader = yaml.BaseLoader 6yaml.PySafeLoader = yaml.SafeLoader 7yaml.PyLoader = yaml.Loader 8yaml.PyBaseDumper = yaml.BaseDumper 9yaml.PySafeDumper = yaml.SafeDumper 10yaml.PyDumper = yaml.Dumper 11 12old_scan = yaml.scan 13def new_scan(stream, Loader=yaml.CLoader): 14 return old_scan(stream, Loader) 15 16old_parse = yaml.parse 17def new_parse(stream, Loader=yaml.CLoader): 18 return old_parse(stream, Loader) 19 20old_compose = yaml.compose 21def new_compose(stream, Loader=yaml.CLoader): 22 return old_compose(stream, Loader) 23 24old_compose_all = yaml.compose_all 25def new_compose_all(stream, Loader=yaml.CLoader): 26 return old_compose_all(stream, Loader) 27 28old_load = yaml.load 29def new_load(stream, Loader=yaml.CLoader): 30 return old_load(stream, Loader) 31 32old_load_all = yaml.load_all 33def new_load_all(stream, Loader=yaml.CLoader): 34 return old_load_all(stream, Loader) 35 36old_safe_load = yaml.safe_load 37def new_safe_load(stream): 38 return old_load(stream, yaml.CSafeLoader) 39 40old_safe_load_all = yaml.safe_load_all 41def new_safe_load_all(stream): 42 return old_load_all(stream, yaml.CSafeLoader) 43 44old_emit = yaml.emit 45def new_emit(events, stream=None, Dumper=yaml.CDumper, **kwds): 46 return old_emit(events, stream, Dumper, **kwds) 47 48old_serialize = yaml.serialize 49def new_serialize(node, stream, Dumper=yaml.CDumper, **kwds): 50 return old_serialize(node, stream, Dumper, **kwds) 51 52old_serialize_all = yaml.serialize_all 53def new_serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds): 54 return old_serialize_all(nodes, stream, Dumper, **kwds) 55 56old_dump = yaml.dump 57def new_dump(data, stream=None, Dumper=yaml.CDumper, **kwds): 58 return old_dump(data, stream, Dumper, **kwds) 59 60old_dump_all = yaml.dump_all 61def new_dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds): 62 return old_dump_all(documents, stream, Dumper, **kwds) 63 64old_safe_dump = yaml.safe_dump 65def new_safe_dump(data, stream=None, **kwds): 66 return old_dump(data, stream, yaml.CSafeDumper, **kwds) 67 68old_safe_dump_all = yaml.safe_dump_all 69def new_safe_dump_all(documents, stream=None, **kwds): 70 return old_dump_all(documents, stream, yaml.CSafeDumper, **kwds) 71 72def _set_up(): 73 yaml.BaseLoader = yaml.CBaseLoader 74 yaml.SafeLoader = yaml.CSafeLoader 75 yaml.Loader = yaml.CLoader 76 yaml.BaseDumper = yaml.CBaseDumper 77 yaml.SafeDumper = yaml.CSafeDumper 78 yaml.Dumper = yaml.CDumper 79 yaml.scan = new_scan 80 yaml.parse = new_parse 81 yaml.compose = new_compose 82 yaml.compose_all = new_compose_all 83 yaml.load = new_load 84 yaml.load_all = new_load_all 85 yaml.safe_load = new_safe_load 86 yaml.safe_load_all = new_safe_load_all 87 yaml.emit = new_emit 88 yaml.serialize = new_serialize 89 yaml.serialize_all = new_serialize_all 90 yaml.dump = new_dump 91 yaml.dump_all = new_dump_all 92 yaml.safe_dump = new_safe_dump 93 yaml.safe_dump_all = new_safe_dump_all 94 95def _tear_down(): 96 yaml.BaseLoader = yaml.PyBaseLoader 97 yaml.SafeLoader = yaml.PySafeLoader 98 yaml.Loader = yaml.PyLoader 99 yaml.BaseDumper = yaml.PyBaseDumper 100 yaml.SafeDumper = yaml.PySafeDumper 101 yaml.Dumper = yaml.PyDumper 102 yaml.scan = old_scan 103 yaml.parse = old_parse 104 yaml.compose = old_compose 105 yaml.compose_all = old_compose_all 106 yaml.load = old_load 107 yaml.load_all = old_load_all 108 yaml.safe_load = old_safe_load 109 yaml.safe_load_all = old_safe_load_all 110 yaml.emit = old_emit 111 yaml.serialize = old_serialize 112 yaml.serialize_all = old_serialize_all 113 yaml.dump = old_dump 114 yaml.dump_all = old_dump_all 115 yaml.safe_dump = old_safe_dump 116 yaml.safe_dump_all = old_safe_dump_all 117 118def test_c_version(verbose=False): 119 if verbose: 120 print(_yaml.get_version()) 121 print(_yaml.get_version_string()) 122 assert ("%s.%s.%s" % yaml._yaml.get_version()) == yaml._yaml.get_version_string(), \ 123 (_yaml.get_version(), yaml._yaml.get_version_string()) 124 125def test_deprecate_yaml_module(): 126 import _yaml 127 assert _yaml.__package__ == '' 128 assert isinstance(_yaml.get_version(), str) 129 130def _compare_scanners(py_data, c_data, verbose): 131 py_tokens = list(yaml.scan(py_data, Loader=yaml.PyLoader)) 132 c_tokens = [] 133 try: 134 for token in yaml.scan(c_data, Loader=yaml.CLoader): 135 c_tokens.append(token) 136 assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens)) 137 for py_token, c_token in zip(py_tokens, c_tokens): 138 assert py_token.__class__ == c_token.__class__, (py_token, c_token) 139 if hasattr(py_token, 'value'): 140 assert py_token.value == c_token.value, (py_token, c_token) 141 if isinstance(py_token, yaml.StreamEndToken): 142 continue 143 py_start = (py_token.start_mark.index, py_token.start_mark.line, py_token.start_mark.column) 144 py_end = (py_token.end_mark.index, py_token.end_mark.line, py_token.end_mark.column) 145 c_start = (c_token.start_mark.index, c_token.start_mark.line, c_token.start_mark.column) 146 c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column) 147 assert py_start == c_start, (py_start, c_start) 148 assert py_end == c_end, (py_end, c_end) 149 finally: 150 if verbose: 151 print("PY_TOKENS:") 152 pprint.pprint(py_tokens) 153 print("C_TOKENS:") 154 pprint.pprint(c_tokens) 155 156def test_c_scanner(data_filename, canonical_filename, verbose=False): 157 with open(data_filename, 'rb') as file1, open(data_filename, 'rb') as file2: 158 _compare_scanners(file1, file2, verbose) 159 with open(data_filename, 'rb') as file1, open(data_filename, 'rb') as file2: 160 _compare_scanners(file1.read(), file2.read(), verbose) 161 with open(canonical_filename, 'rb') as file1, open(canonical_filename, 'rb') as file2: 162 _compare_scanners(file1, file2, verbose) 163 with open(canonical_filename, 'rb') as file1, open(canonical_filename, 'rb') as file2: 164 _compare_scanners(file1.read(), file2.read(), verbose) 165 166test_c_scanner.unittest = ['.data', '.canonical'] 167test_c_scanner.skip = ['.skip-ext'] 168 169def _compare_parsers(py_data, c_data, verbose): 170 py_events = list(yaml.parse(py_data, Loader=yaml.PyLoader)) 171 c_events = [] 172 try: 173 for event in yaml.parse(c_data, Loader=yaml.CLoader): 174 c_events.append(event) 175 assert len(py_events) == len(c_events), (len(py_events), len(c_events)) 176 for py_event, c_event in zip(py_events, c_events): 177 for attribute in ['__class__', 'anchor', 'tag', 'implicit', 178 'value', 'explicit', 'version', 'tags']: 179 py_value = getattr(py_event, attribute, None) 180 c_value = getattr(c_event, attribute, None) 181 assert py_value == c_value, (py_event, c_event, attribute) 182 finally: 183 if verbose: 184 print("PY_EVENTS:") 185 pprint.pprint(py_events) 186 print("C_EVENTS:") 187 pprint.pprint(c_events) 188 189def test_c_parser(data_filename, canonical_filename, verbose=False): 190 with open(data_filename, 'rb') as file1, open(data_filename, 'rb') as file2: 191 _compare_parsers(file1, file2, verbose) 192 with open(data_filename, 'rb') as file1, open(data_filename, 'rb') as file2: 193 _compare_parsers(file1.read(), file2.read(), verbose) 194 with open(canonical_filename, 'rb') as file1, open(canonical_filename, 'rb') as file2: 195 _compare_parsers(file1, file2, verbose) 196 with open(canonical_filename, 'rb') as file1, open(canonical_filename, 'rb') as file2: 197 _compare_parsers(file1.read(), file2.read(), verbose) 198 199test_c_parser.unittest = ['.data', '.canonical'] 200test_c_parser.skip = ['.skip-ext'] 201 202def _compare_emitters(data, verbose): 203 events = list(yaml.parse(data, Loader=yaml.PyLoader)) 204 c_data = yaml.emit(events, Dumper=yaml.CDumper) 205 if verbose: 206 print(c_data) 207 py_events = list(yaml.parse(c_data, Loader=yaml.PyLoader)) 208 c_events = list(yaml.parse(c_data, Loader=yaml.CLoader)) 209 try: 210 assert len(events) == len(py_events), (len(events), len(py_events)) 211 assert len(events) == len(c_events), (len(events), len(c_events)) 212 for event, py_event, c_event in zip(events, py_events, c_events): 213 for attribute in ['__class__', 'anchor', 'tag', 'implicit', 214 'value', 'explicit', 'version', 'tags']: 215 value = getattr(event, attribute, None) 216 py_value = getattr(py_event, attribute, None) 217 c_value = getattr(c_event, attribute, None) 218 if attribute == 'tag' and value in [None, '!'] \ 219 and py_value in [None, '!'] and c_value in [None, '!']: 220 continue 221 if attribute == 'explicit' and (py_value or c_value): 222 continue 223 assert value == py_value, (event, py_event, attribute) 224 assert value == c_value, (event, c_event, attribute) 225 finally: 226 if verbose: 227 print("EVENTS:") 228 pprint.pprint(events) 229 print("PY_EVENTS:") 230 pprint.pprint(py_events) 231 print("C_EVENTS:") 232 pprint.pprint(c_events) 233 234def test_c_emitter(data_filename, canonical_filename, verbose=False): 235 with open(data_filename, 'rb') as file: 236 _compare_emitters(file.read(), verbose) 237 with open(canonical_filename, 'rb') as file: 238 _compare_emitters(file.read(), verbose) 239 240test_c_emitter.unittest = ['.data', '.canonical'] 241test_c_emitter.skip = ['.skip-ext'] 242 243def test_large_file(verbose=False): 244 SIZE_LINE = 24 245 SIZE_ITERATION = 0 246 SIZE_FILE = 31 247 if sys.maxsize <= 2**32: 248 return 249 if os.environ.get('PYYAML_TEST_GROUP', '') != 'all': 250 return 251 with tempfile.TemporaryFile() as temp_file: 252 for i in range(2**(SIZE_FILE-SIZE_ITERATION-SIZE_LINE) + 1): 253 temp_file.write(bytes(('-' + (' ' * (2**SIZE_LINE-4))+ '{}\n')*(2**SIZE_ITERATION), 'utf-8')) 254 temp_file.seek(0) 255 yaml.load(temp_file, Loader=yaml.CLoader) 256 257test_large_file.unittest = None 258 259def wrap_ext_function(function): 260 def wrapper(*args, **kwds): 261 _set_up() 262 try: 263 function(*args, **kwds) 264 finally: 265 _tear_down() 266 wrapper.__name__ = '%s_ext' % function.__name__ 267 wrapper.unittest = function.unittest 268 wrapper.skip = getattr(function, 'skip', [])+['.skip-ext'] 269 return wrapper 270 271def wrap_ext(collections): 272 functions = [] 273 if not isinstance(collections, list): 274 collections = [collections] 275 for collection in collections: 276 if not isinstance(collection, dict): 277 collection = vars(collection) 278 for key in sorted(collection): 279 value = collection[key] 280 if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): 281 functions.append(wrap_ext_function(value)) 282 for function in functions: 283 assert function.__name__ not in globals() 284 globals()[function.__name__] = function 285 286import test_tokens, test_structure, test_errors, test_resolver, test_constructor, \ 287 test_emitter, test_representer, test_recursive, test_input_output 288wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor, 289 test_emitter, test_representer, test_recursive, test_input_output]) 290 291if __name__ == '__main__': 292 import test_appliance 293 test_appliance.run(globals()) 294 295