1import io 2import json 3import logging 4import os 5import re 6from contextlib import contextmanager 7from textwrap import indent, wrap 8from typing import Any, Dict, Iterator, List, Optional, Sequence, Union, cast 9 10from .fastjsonschema_exceptions import JsonSchemaValueException 11 12_logger = logging.getLogger(__name__) 13 14_MESSAGE_REPLACEMENTS = { 15 "must be named by propertyName definition": "keys must be named by", 16 "one of contains definition": "at least one item that matches", 17 " same as const definition:": "", 18 "only specified items": "only items matching the definition", 19} 20 21_SKIP_DETAILS = ( 22 "must not be empty", 23 "is always invalid", 24 "must not be there", 25) 26 27_NEED_DETAILS = {"anyOf", "oneOf", "anyOf", "contains", "propertyNames", "not", "items"} 28 29_CAMEL_CASE_SPLITTER = re.compile(r"\W+|([A-Z][^A-Z\W]*)") 30_IDENTIFIER = re.compile(r"^[\w_]+$", re.I) 31 32_TOML_JARGON = { 33 "object": "table", 34 "property": "key", 35 "properties": "keys", 36 "property names": "keys", 37} 38 39 40class ValidationError(JsonSchemaValueException): 41 """Report violations of a given JSON schema. 42 43 This class extends :exc:`~fastjsonschema.JsonSchemaValueException` 44 by adding the following properties: 45 46 - ``summary``: an improved version of the ``JsonSchemaValueException`` error message 47 with only the necessary information) 48 49 - ``details``: more contextual information about the error like the failing schema 50 itself and the value that violates the schema. 51 52 Depending on the level of the verbosity of the ``logging`` configuration 53 the exception message will be only ``summary`` (default) or a combination of 54 ``summary`` and ``details`` (when the logging level is set to :obj:`logging.DEBUG`). 55 """ 56 57 summary = "" 58 details = "" 59 _original_message = "" 60 61 @classmethod 62 def _from_jsonschema(cls, ex: JsonSchemaValueException): 63 formatter = _ErrorFormatting(ex) 64 obj = cls(str(formatter), ex.value, formatter.name, ex.definition, ex.rule) 65 debug_code = os.getenv("JSONSCHEMA_DEBUG_CODE_GENERATION", "false").lower() 66 if debug_code != "false": # pragma: no cover 67 obj.__cause__, obj.__traceback__ = ex.__cause__, ex.__traceback__ 68 obj._original_message = ex.message 69 obj.summary = formatter.summary 70 obj.details = formatter.details 71 return obj 72 73 74@contextmanager 75def detailed_errors(): 76 try: 77 yield 78 except JsonSchemaValueException as ex: 79 raise ValidationError._from_jsonschema(ex) from None 80 81 82class _ErrorFormatting: 83 def __init__(self, ex: JsonSchemaValueException): 84 self.ex = ex 85 self.name = f"`{self._simplify_name(ex.name)}`" 86 self._original_message = self.ex.message.replace(ex.name, self.name) 87 self._summary = "" 88 self._details = "" 89 90 def __str__(self) -> str: 91 if _logger.getEffectiveLevel() <= logging.DEBUG and self.details: 92 return f"{self.summary}\n\n{self.details}" 93 94 return self.summary 95 96 @property 97 def summary(self) -> str: 98 if not self._summary: 99 self._summary = self._expand_summary() 100 101 return self._summary 102 103 @property 104 def details(self) -> str: 105 if not self._details: 106 self._details = self._expand_details() 107 108 return self._details 109 110 def _simplify_name(self, name): 111 x = len("data.") 112 return name[x:] if name.startswith("data.") else name 113 114 def _expand_summary(self): 115 msg = self._original_message 116 117 for bad, repl in _MESSAGE_REPLACEMENTS.items(): 118 msg = msg.replace(bad, repl) 119 120 if any(substring in msg for substring in _SKIP_DETAILS): 121 return msg 122 123 schema = self.ex.rule_definition 124 if self.ex.rule in _NEED_DETAILS and schema: 125 summary = _SummaryWriter(_TOML_JARGON) 126 return f"{msg}:\n\n{indent(summary(schema), ' ')}" 127 128 return msg 129 130 def _expand_details(self) -> str: 131 optional = [] 132 desc_lines = self.ex.definition.pop("$$description", []) 133 desc = self.ex.definition.pop("description", None) or " ".join(desc_lines) 134 if desc: 135 description = "\n".join( 136 wrap( 137 desc, 138 width=80, 139 initial_indent=" ", 140 subsequent_indent=" ", 141 break_long_words=False, 142 ) 143 ) 144 optional.append(f"DESCRIPTION:\n{description}") 145 schema = json.dumps(self.ex.definition, indent=4) 146 value = json.dumps(self.ex.value, indent=4) 147 defaults = [ 148 f"GIVEN VALUE:\n{indent(value, ' ')}", 149 f"OFFENDING RULE: {self.ex.rule!r}", 150 f"DEFINITION:\n{indent(schema, ' ')}", 151 ] 152 return "\n\n".join(optional + defaults) 153 154 155class _SummaryWriter: 156 _IGNORE = {"description", "default", "title", "examples"} 157 158 def __init__(self, jargon: Optional[Dict[str, str]] = None): 159 self.jargon: Dict[str, str] = jargon or {} 160 # Clarify confusing terms 161 self._terms = { 162 "anyOf": "at least one of the following", 163 "oneOf": "exactly one of the following", 164 "allOf": "all of the following", 165 "not": "(*NOT* the following)", 166 "prefixItems": f"{self._jargon('items')} (in order)", 167 "items": "items", 168 "contains": "contains at least one of", 169 "propertyNames": ( 170 f"non-predefined acceptable {self._jargon('property names')}" 171 ), 172 "patternProperties": f"{self._jargon('properties')} named via pattern", 173 "const": "predefined value", 174 "enum": "one of", 175 } 176 # Attributes that indicate that the definition is easy and can be done 177 # inline (e.g. string and number) 178 self._guess_inline_defs = [ 179 "enum", 180 "const", 181 "maxLength", 182 "minLength", 183 "pattern", 184 "format", 185 "minimum", 186 "maximum", 187 "exclusiveMinimum", 188 "exclusiveMaximum", 189 "multipleOf", 190 ] 191 192 def _jargon(self, term: Union[str, List[str]]) -> Union[str, List[str]]: 193 if isinstance(term, list): 194 return [self.jargon.get(t, t) for t in term] 195 return self.jargon.get(term, term) 196 197 def __call__( 198 self, 199 schema: Union[dict, List[dict]], 200 prefix: str = "", 201 *, 202 _path: Sequence[str] = (), 203 ) -> str: 204 if isinstance(schema, list): 205 return self._handle_list(schema, prefix, _path) 206 207 filtered = self._filter_unecessary(schema, _path) 208 simple = self._handle_simple_dict(filtered, _path) 209 if simple: 210 return f"{prefix}{simple}" 211 212 child_prefix = self._child_prefix(prefix, " ") 213 item_prefix = self._child_prefix(prefix, "- ") 214 indent = len(prefix) * " " 215 with io.StringIO() as buffer: 216 for i, (key, value) in enumerate(filtered.items()): 217 child_path = [*_path, key] 218 line_prefix = prefix if i == 0 else indent 219 buffer.write(f"{line_prefix}{self._label(child_path)}:") 220 # ^ just the first item should receive the complete prefix 221 if isinstance(value, dict): 222 filtered = self._filter_unecessary(value, child_path) 223 simple = self._handle_simple_dict(filtered, child_path) 224 buffer.write( 225 f" {simple}" 226 if simple 227 else f"\n{self(value, child_prefix, _path=child_path)}" 228 ) 229 elif isinstance(value, list) and ( 230 key != "type" or self._is_property(child_path) 231 ): 232 children = self._handle_list(value, item_prefix, child_path) 233 sep = " " if children.startswith("[") else "\n" 234 buffer.write(f"{sep}{children}") 235 else: 236 buffer.write(f" {self._value(value, child_path)}\n") 237 return buffer.getvalue() 238 239 def _is_unecessary(self, path: Sequence[str]) -> bool: 240 if self._is_property(path) or not path: # empty path => instruction @ root 241 return False 242 key = path[-1] 243 return any(key.startswith(k) for k in "$_") or key in self._IGNORE 244 245 def _filter_unecessary(self, schema: dict, path: Sequence[str]): 246 return { 247 key: value 248 for key, value in schema.items() 249 if not self._is_unecessary([*path, key]) 250 } 251 252 def _handle_simple_dict(self, value: dict, path: Sequence[str]) -> Optional[str]: 253 inline = any(p in value for p in self._guess_inline_defs) 254 simple = not any(isinstance(v, (list, dict)) for v in value.values()) 255 if inline or simple: 256 return f"{{{', '.join(self._inline_attrs(value, path))}}}\n" 257 return None 258 259 def _handle_list( 260 self, schemas: list, prefix: str = "", path: Sequence[str] = () 261 ) -> str: 262 if self._is_unecessary(path): 263 return "" 264 265 repr_ = repr(schemas) 266 if all(not isinstance(e, (dict, list)) for e in schemas) and len(repr_) < 60: 267 return f"{repr_}\n" 268 269 item_prefix = self._child_prefix(prefix, "- ") 270 return "".join( 271 self(v, item_prefix, _path=[*path, f"[{i}]"]) for i, v in enumerate(schemas) 272 ) 273 274 def _is_property(self, path: Sequence[str]): 275 """Check if the given path can correspond to an arbitrarily named property""" 276 counter = 0 277 for key in path[-2::-1]: 278 if key not in {"properties", "patternProperties"}: 279 break 280 counter += 1 281 282 # If the counter if even, the path correspond to a JSON Schema keyword 283 # otherwise it can be any arbitrary string naming a property 284 return counter % 2 == 1 285 286 def _label(self, path: Sequence[str]) -> str: 287 *parents, key = path 288 if not self._is_property(path): 289 norm_key = _separate_terms(key) 290 return self._terms.get(key) or " ".join(self._jargon(norm_key)) 291 292 if parents[-1] == "patternProperties": 293 return f"(regex {key!r})" 294 return repr(key) # property name 295 296 def _value(self, value: Any, path: Sequence[str]) -> str: 297 if path[-1] == "type" and not self._is_property(path): 298 type_ = self._jargon(value) 299 return ( 300 f"[{', '.join(type_)}]" if isinstance(value, list) else cast(str, type_) 301 ) 302 return repr(value) 303 304 def _inline_attrs(self, schema: dict, path: Sequence[str]) -> Iterator[str]: 305 for key, value in schema.items(): 306 child_path = [*path, key] 307 yield f"{self._label(child_path)}: {self._value(value, child_path)}" 308 309 def _child_prefix(self, parent_prefix: str, child_prefix: str) -> str: 310 return len(parent_prefix) * " " + child_prefix 311 312 313def _separate_terms(word: str) -> List[str]: 314 """ 315 >>> _separate_terms("FooBar-foo") 316 "foo bar foo" 317 """ 318 return [w.lower() for w in _CAMEL_CASE_SPLITTER.split(word) if w] 319