1 2import yaml, yaml.composer, yaml.constructor, yaml.resolver 3 4class CanonicalError(yaml.YAMLError): 5 pass 6 7class CanonicalScanner: 8 9 def __init__(self, data): 10 if isinstance(data, bytes): 11 try: 12 data = data.decode('utf-8') 13 except UnicodeDecodeError: 14 raise CanonicalError("utf-8 stream is expected") 15 self.data = data+'\0' 16 self.index = 0 17 self.tokens = [] 18 self.scanned = False 19 20 def check_token(self, *choices): 21 if not self.scanned: 22 self.scan() 23 if self.tokens: 24 if not choices: 25 return True 26 for choice in choices: 27 if isinstance(self.tokens[0], choice): 28 return True 29 return False 30 31 def peek_token(self): 32 if not self.scanned: 33 self.scan() 34 if self.tokens: 35 return self.tokens[0] 36 37 def get_token(self, choice=None): 38 if not self.scanned: 39 self.scan() 40 token = self.tokens.pop(0) 41 if choice and not isinstance(token, choice): 42 raise CanonicalError("unexpected token "+repr(token)) 43 return token 44 45 def get_token_value(self): 46 token = self.get_token() 47 return token.value 48 49 def scan(self): 50 self.tokens.append(yaml.StreamStartToken(None, None)) 51 while True: 52 self.find_token() 53 ch = self.data[self.index] 54 if ch == '\0': 55 self.tokens.append(yaml.StreamEndToken(None, None)) 56 break 57 elif ch == '%': 58 self.tokens.append(self.scan_directive()) 59 elif ch == '-' and self.data[self.index:self.index+3] == '---': 60 self.index += 3 61 self.tokens.append(yaml.DocumentStartToken(None, None)) 62 elif ch == '[': 63 self.index += 1 64 self.tokens.append(yaml.FlowSequenceStartToken(None, None)) 65 elif ch == '{': 66 self.index += 1 67 self.tokens.append(yaml.FlowMappingStartToken(None, None)) 68 elif ch == ']': 69 self.index += 1 70 self.tokens.append(yaml.FlowSequenceEndToken(None, None)) 71 elif ch == '}': 72 self.index += 1 73 self.tokens.append(yaml.FlowMappingEndToken(None, None)) 74 elif ch == '?': 75 self.index += 1 76 self.tokens.append(yaml.KeyToken(None, None)) 77 elif ch == ':': 78 self.index += 1 79 self.tokens.append(yaml.ValueToken(None, None)) 80 elif ch == ',': 81 self.index += 1 82 self.tokens.append(yaml.FlowEntryToken(None, None)) 83 elif ch == '*' or ch == '&': 84 self.tokens.append(self.scan_alias()) 85 elif ch == '!': 86 self.tokens.append(self.scan_tag()) 87 elif ch == '"': 88 self.tokens.append(self.scan_scalar()) 89 else: 90 raise CanonicalError("invalid token") 91 self.scanned = True 92 93 DIRECTIVE = '%YAML 1.1' 94 95 def scan_directive(self): 96 if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \ 97 self.data[self.index+len(self.DIRECTIVE)] in ' \n\0': 98 self.index += len(self.DIRECTIVE) 99 return yaml.DirectiveToken('YAML', (1, 1), None, None) 100 else: 101 raise CanonicalError("invalid directive") 102 103 def scan_alias(self): 104 if self.data[self.index] == '*': 105 TokenClass = yaml.AliasToken 106 else: 107 TokenClass = yaml.AnchorToken 108 self.index += 1 109 start = self.index 110 while self.data[self.index] not in ', \n\0': 111 self.index += 1 112 value = self.data[start:self.index] 113 return TokenClass(value, None, None) 114 115 def scan_tag(self): 116 self.index += 1 117 start = self.index 118 while self.data[self.index] not in ' \n\0': 119 self.index += 1 120 value = self.data[start:self.index] 121 if not value: 122 value = '!' 123 elif value[0] == '!': 124 value = 'tag:yaml.org,2002:'+value[1:] 125 elif value[0] == '<' and value[-1] == '>': 126 value = value[1:-1] 127 else: 128 value = '!'+value 129 return yaml.TagToken(value, None, None) 130 131 QUOTE_CODES = { 132 'x': 2, 133 'u': 4, 134 'U': 8, 135 } 136 137 QUOTE_REPLACES = { 138 '\\': '\\', 139 '\"': '\"', 140 ' ': ' ', 141 'a': '\x07', 142 'b': '\x08', 143 'e': '\x1B', 144 'f': '\x0C', 145 'n': '\x0A', 146 'r': '\x0D', 147 't': '\x09', 148 'v': '\x0B', 149 'N': '\u0085', 150 'L': '\u2028', 151 'P': '\u2029', 152 '_': '_', 153 '0': '\x00', 154 } 155 156 def scan_scalar(self): 157 self.index += 1 158 chunks = [] 159 start = self.index 160 ignore_spaces = False 161 while self.data[self.index] != '"': 162 if self.data[self.index] == '\\': 163 ignore_spaces = False 164 chunks.append(self.data[start:self.index]) 165 self.index += 1 166 ch = self.data[self.index] 167 self.index += 1 168 if ch == '\n': 169 ignore_spaces = True 170 elif ch in self.QUOTE_CODES: 171 length = self.QUOTE_CODES[ch] 172 code = int(self.data[self.index:self.index+length], 16) 173 chunks.append(chr(code)) 174 self.index += length 175 else: 176 if ch not in self.QUOTE_REPLACES: 177 raise CanonicalError("invalid escape code") 178 chunks.append(self.QUOTE_REPLACES[ch]) 179 start = self.index 180 elif self.data[self.index] == '\n': 181 chunks.append(self.data[start:self.index]) 182 chunks.append(' ') 183 self.index += 1 184 start = self.index 185 ignore_spaces = True 186 elif ignore_spaces and self.data[self.index] == ' ': 187 self.index += 1 188 start = self.index 189 else: 190 ignore_spaces = False 191 self.index += 1 192 chunks.append(self.data[start:self.index]) 193 self.index += 1 194 return yaml.ScalarToken(''.join(chunks), False, None, None) 195 196 def find_token(self): 197 found = False 198 while not found: 199 while self.data[self.index] in ' \t': 200 self.index += 1 201 if self.data[self.index] == '#': 202 while self.data[self.index] != '\n': 203 self.index += 1 204 if self.data[self.index] == '\n': 205 self.index += 1 206 else: 207 found = True 208 209class CanonicalParser: 210 211 def __init__(self): 212 self.events = [] 213 self.parsed = False 214 215 def dispose(self): 216 pass 217 218 # stream: STREAM-START document* STREAM-END 219 def parse_stream(self): 220 self.get_token(yaml.StreamStartToken) 221 self.events.append(yaml.StreamStartEvent(None, None)) 222 while not self.check_token(yaml.StreamEndToken): 223 if self.check_token(yaml.DirectiveToken, yaml.DocumentStartToken): 224 self.parse_document() 225 else: 226 raise CanonicalError("document is expected, got "+repr(self.tokens[0])) 227 self.get_token(yaml.StreamEndToken) 228 self.events.append(yaml.StreamEndEvent(None, None)) 229 230 # document: DIRECTIVE? DOCUMENT-START node 231 def parse_document(self): 232 node = None 233 if self.check_token(yaml.DirectiveToken): 234 self.get_token(yaml.DirectiveToken) 235 self.get_token(yaml.DocumentStartToken) 236 self.events.append(yaml.DocumentStartEvent(None, None)) 237 self.parse_node() 238 self.events.append(yaml.DocumentEndEvent(None, None)) 239 240 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping) 241 def parse_node(self): 242 if self.check_token(yaml.AliasToken): 243 self.events.append(yaml.AliasEvent(self.get_token_value(), None, None)) 244 else: 245 anchor = None 246 if self.check_token(yaml.AnchorToken): 247 anchor = self.get_token_value() 248 tag = None 249 if self.check_token(yaml.TagToken): 250 tag = self.get_token_value() 251 if self.check_token(yaml.ScalarToken): 252 self.events.append(yaml.ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None)) 253 elif self.check_token(yaml.FlowSequenceStartToken): 254 self.events.append(yaml.SequenceStartEvent(anchor, tag, None, None)) 255 self.parse_sequence() 256 elif self.check_token(yaml.FlowMappingStartToken): 257 self.events.append(yaml.MappingStartEvent(anchor, tag, None, None)) 258 self.parse_mapping() 259 else: 260 raise CanonicalError("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[0])) 261 262 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END 263 def parse_sequence(self): 264 self.get_token(yaml.FlowSequenceStartToken) 265 if not self.check_token(yaml.FlowSequenceEndToken): 266 self.parse_node() 267 while not self.check_token(yaml.FlowSequenceEndToken): 268 self.get_token(yaml.FlowEntryToken) 269 if not self.check_token(yaml.FlowSequenceEndToken): 270 self.parse_node() 271 self.get_token(yaml.FlowSequenceEndToken) 272 self.events.append(yaml.SequenceEndEvent(None, None)) 273 274 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END 275 def parse_mapping(self): 276 self.get_token(yaml.FlowMappingStartToken) 277 if not self.check_token(yaml.FlowMappingEndToken): 278 self.parse_map_entry() 279 while not self.check_token(yaml.FlowMappingEndToken): 280 self.get_token(yaml.FlowEntryToken) 281 if not self.check_token(yaml.FlowMappingEndToken): 282 self.parse_map_entry() 283 self.get_token(yaml.FlowMappingEndToken) 284 self.events.append(yaml.MappingEndEvent(None, None)) 285 286 # map_entry: KEY node VALUE node 287 def parse_map_entry(self): 288 self.get_token(yaml.KeyToken) 289 self.parse_node() 290 self.get_token(yaml.ValueToken) 291 self.parse_node() 292 293 def parse(self): 294 self.parse_stream() 295 self.parsed = True 296 297 def get_event(self): 298 if not self.parsed: 299 self.parse() 300 return self.events.pop(0) 301 302 def check_event(self, *choices): 303 if not self.parsed: 304 self.parse() 305 if self.events: 306 if not choices: 307 return True 308 for choice in choices: 309 if isinstance(self.events[0], choice): 310 return True 311 return False 312 313 def peek_event(self): 314 if not self.parsed: 315 self.parse() 316 return self.events[0] 317 318class CanonicalLoader(CanonicalScanner, CanonicalParser, 319 yaml.composer.Composer, yaml.constructor.Constructor, yaml.resolver.Resolver): 320 321 def __init__(self, stream): 322 if hasattr(stream, 'read'): 323 stream = stream.read() 324 CanonicalScanner.__init__(self, stream) 325 CanonicalParser.__init__(self) 326 yaml.composer.Composer.__init__(self) 327 yaml.constructor.Constructor.__init__(self) 328 yaml.resolver.Resolver.__init__(self) 329 330yaml.CanonicalLoader = CanonicalLoader 331 332def canonical_scan(stream): 333 return yaml.scan(stream, Loader=CanonicalLoader) 334 335yaml.canonical_scan = canonical_scan 336 337def canonical_parse(stream): 338 return yaml.parse(stream, Loader=CanonicalLoader) 339 340yaml.canonical_parse = canonical_parse 341 342def canonical_compose(stream): 343 return yaml.compose(stream, Loader=CanonicalLoader) 344 345yaml.canonical_compose = canonical_compose 346 347def canonical_compose_all(stream): 348 return yaml.compose_all(stream, Loader=CanonicalLoader) 349 350yaml.canonical_compose_all = canonical_compose_all 351 352def canonical_load(stream): 353 return yaml.load(stream, Loader=CanonicalLoader) 354 355yaml.canonical_load = canonical_load 356 357def canonical_load_all(stream): 358 return yaml.load_all(stream, Loader=CanonicalLoader) 359 360yaml.canonical_load_all = canonical_load_all 361 362