• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import io
2import json
3import logging
4import os
5import re
6from contextlib import contextmanager
7from textwrap import indent, wrap
8from typing import Any, Dict, Iterator, List, Optional, Sequence, Union, cast
9
10from .fastjsonschema_exceptions import JsonSchemaValueException
11
12_logger = logging.getLogger(__name__)
13
14_MESSAGE_REPLACEMENTS = {
15    "must be named by propertyName definition": "keys must be named by",
16    "one of contains definition": "at least one item that matches",
17    " same as const definition:": "",
18    "only specified items": "only items matching the definition",
19}
20
21_SKIP_DETAILS = (
22    "must not be empty",
23    "is always invalid",
24    "must not be there",
25)
26
27_NEED_DETAILS = {"anyOf", "oneOf", "anyOf", "contains", "propertyNames", "not", "items"}
28
29_CAMEL_CASE_SPLITTER = re.compile(r"\W+|([A-Z][^A-Z\W]*)")
30_IDENTIFIER = re.compile(r"^[\w_]+$", re.I)
31
32_TOML_JARGON = {
33    "object": "table",
34    "property": "key",
35    "properties": "keys",
36    "property names": "keys",
37}
38
39
40class ValidationError(JsonSchemaValueException):
41    """Report violations of a given JSON schema.
42
43    This class extends :exc:`~fastjsonschema.JsonSchemaValueException`
44    by adding the following properties:
45
46    - ``summary``: an improved version of the ``JsonSchemaValueException`` error message
47      with only the necessary information)
48
49    - ``details``: more contextual information about the error like the failing schema
50      itself and the value that violates the schema.
51
52    Depending on the level of the verbosity of the ``logging`` configuration
53    the exception message will be only ``summary`` (default) or a combination of
54    ``summary`` and ``details`` (when the logging level is set to :obj:`logging.DEBUG`).
55    """
56
57    summary = ""
58    details = ""
59    _original_message = ""
60
61    @classmethod
62    def _from_jsonschema(cls, ex: JsonSchemaValueException):
63        formatter = _ErrorFormatting(ex)
64        obj = cls(str(formatter), ex.value, formatter.name, ex.definition, ex.rule)
65        debug_code = os.getenv("JSONSCHEMA_DEBUG_CODE_GENERATION", "false").lower()
66        if debug_code != "false":  # pragma: no cover
67            obj.__cause__, obj.__traceback__ = ex.__cause__, ex.__traceback__
68        obj._original_message = ex.message
69        obj.summary = formatter.summary
70        obj.details = formatter.details
71        return obj
72
73
74@contextmanager
75def detailed_errors():
76    try:
77        yield
78    except JsonSchemaValueException as ex:
79        raise ValidationError._from_jsonschema(ex) from None
80
81
82class _ErrorFormatting:
83    def __init__(self, ex: JsonSchemaValueException):
84        self.ex = ex
85        self.name = f"`{self._simplify_name(ex.name)}`"
86        self._original_message = self.ex.message.replace(ex.name, self.name)
87        self._summary = ""
88        self._details = ""
89
90    def __str__(self) -> str:
91        if _logger.getEffectiveLevel() <= logging.DEBUG and self.details:
92            return f"{self.summary}\n\n{self.details}"
93
94        return self.summary
95
96    @property
97    def summary(self) -> str:
98        if not self._summary:
99            self._summary = self._expand_summary()
100
101        return self._summary
102
103    @property
104    def details(self) -> str:
105        if not self._details:
106            self._details = self._expand_details()
107
108        return self._details
109
110    def _simplify_name(self, name):
111        x = len("data.")
112        return name[x:] if name.startswith("data.") else name
113
114    def _expand_summary(self):
115        msg = self._original_message
116
117        for bad, repl in _MESSAGE_REPLACEMENTS.items():
118            msg = msg.replace(bad, repl)
119
120        if any(substring in msg for substring in _SKIP_DETAILS):
121            return msg
122
123        schema = self.ex.rule_definition
124        if self.ex.rule in _NEED_DETAILS and schema:
125            summary = _SummaryWriter(_TOML_JARGON)
126            return f"{msg}:\n\n{indent(summary(schema), '    ')}"
127
128        return msg
129
130    def _expand_details(self) -> str:
131        optional = []
132        desc_lines = self.ex.definition.pop("$$description", [])
133        desc = self.ex.definition.pop("description", None) or " ".join(desc_lines)
134        if desc:
135            description = "\n".join(
136                wrap(
137                    desc,
138                    width=80,
139                    initial_indent="    ",
140                    subsequent_indent="    ",
141                    break_long_words=False,
142                )
143            )
144            optional.append(f"DESCRIPTION:\n{description}")
145        schema = json.dumps(self.ex.definition, indent=4)
146        value = json.dumps(self.ex.value, indent=4)
147        defaults = [
148            f"GIVEN VALUE:\n{indent(value, '    ')}",
149            f"OFFENDING RULE: {self.ex.rule!r}",
150            f"DEFINITION:\n{indent(schema, '    ')}",
151        ]
152        return "\n\n".join(optional + defaults)
153
154
155class _SummaryWriter:
156    _IGNORE = {"description", "default", "title", "examples"}
157
158    def __init__(self, jargon: Optional[Dict[str, str]] = None):
159        self.jargon: Dict[str, str] = jargon or {}
160        # Clarify confusing terms
161        self._terms = {
162            "anyOf": "at least one of the following",
163            "oneOf": "exactly one of the following",
164            "allOf": "all of the following",
165            "not": "(*NOT* the following)",
166            "prefixItems": f"{self._jargon('items')} (in order)",
167            "items": "items",
168            "contains": "contains at least one of",
169            "propertyNames": (
170                f"non-predefined acceptable {self._jargon('property names')}"
171            ),
172            "patternProperties": f"{self._jargon('properties')} named via pattern",
173            "const": "predefined value",
174            "enum": "one of",
175        }
176        # Attributes that indicate that the definition is easy and can be done
177        # inline (e.g. string and number)
178        self._guess_inline_defs = [
179            "enum",
180            "const",
181            "maxLength",
182            "minLength",
183            "pattern",
184            "format",
185            "minimum",
186            "maximum",
187            "exclusiveMinimum",
188            "exclusiveMaximum",
189            "multipleOf",
190        ]
191
192    def _jargon(self, term: Union[str, List[str]]) -> Union[str, List[str]]:
193        if isinstance(term, list):
194            return [self.jargon.get(t, t) for t in term]
195        return self.jargon.get(term, term)
196
197    def __call__(
198        self,
199        schema: Union[dict, List[dict]],
200        prefix: str = "",
201        *,
202        _path: Sequence[str] = (),
203    ) -> str:
204        if isinstance(schema, list):
205            return self._handle_list(schema, prefix, _path)
206
207        filtered = self._filter_unecessary(schema, _path)
208        simple = self._handle_simple_dict(filtered, _path)
209        if simple:
210            return f"{prefix}{simple}"
211
212        child_prefix = self._child_prefix(prefix, "  ")
213        item_prefix = self._child_prefix(prefix, "- ")
214        indent = len(prefix) * " "
215        with io.StringIO() as buffer:
216            for i, (key, value) in enumerate(filtered.items()):
217                child_path = [*_path, key]
218                line_prefix = prefix if i == 0 else indent
219                buffer.write(f"{line_prefix}{self._label(child_path)}:")
220                # ^  just the first item should receive the complete prefix
221                if isinstance(value, dict):
222                    filtered = self._filter_unecessary(value, child_path)
223                    simple = self._handle_simple_dict(filtered, child_path)
224                    buffer.write(
225                        f" {simple}"
226                        if simple
227                        else f"\n{self(value, child_prefix, _path=child_path)}"
228                    )
229                elif isinstance(value, list) and (
230                    key != "type" or self._is_property(child_path)
231                ):
232                    children = self._handle_list(value, item_prefix, child_path)
233                    sep = " " if children.startswith("[") else "\n"
234                    buffer.write(f"{sep}{children}")
235                else:
236                    buffer.write(f" {self._value(value, child_path)}\n")
237            return buffer.getvalue()
238
239    def _is_unecessary(self, path: Sequence[str]) -> bool:
240        if self._is_property(path) or not path:  # empty path => instruction @ root
241            return False
242        key = path[-1]
243        return any(key.startswith(k) for k in "$_") or key in self._IGNORE
244
245    def _filter_unecessary(self, schema: dict, path: Sequence[str]):
246        return {
247            key: value
248            for key, value in schema.items()
249            if not self._is_unecessary([*path, key])
250        }
251
252    def _handle_simple_dict(self, value: dict, path: Sequence[str]) -> Optional[str]:
253        inline = any(p in value for p in self._guess_inline_defs)
254        simple = not any(isinstance(v, (list, dict)) for v in value.values())
255        if inline or simple:
256            return f"{{{', '.join(self._inline_attrs(value, path))}}}\n"
257        return None
258
259    def _handle_list(
260        self, schemas: list, prefix: str = "", path: Sequence[str] = ()
261    ) -> str:
262        if self._is_unecessary(path):
263            return ""
264
265        repr_ = repr(schemas)
266        if all(not isinstance(e, (dict, list)) for e in schemas) and len(repr_) < 60:
267            return f"{repr_}\n"
268
269        item_prefix = self._child_prefix(prefix, "- ")
270        return "".join(
271            self(v, item_prefix, _path=[*path, f"[{i}]"]) for i, v in enumerate(schemas)
272        )
273
274    def _is_property(self, path: Sequence[str]):
275        """Check if the given path can correspond to an arbitrarily named property"""
276        counter = 0
277        for key in path[-2::-1]:
278            if key not in {"properties", "patternProperties"}:
279                break
280            counter += 1
281
282        # If the counter if even, the path correspond to a JSON Schema keyword
283        # otherwise it can be any arbitrary string naming a property
284        return counter % 2 == 1
285
286    def _label(self, path: Sequence[str]) -> str:
287        *parents, key = path
288        if not self._is_property(path):
289            norm_key = _separate_terms(key)
290            return self._terms.get(key) or " ".join(self._jargon(norm_key))
291
292        if parents[-1] == "patternProperties":
293            return f"(regex {key!r})"
294        return repr(key)  # property name
295
296    def _value(self, value: Any, path: Sequence[str]) -> str:
297        if path[-1] == "type" and not self._is_property(path):
298            type_ = self._jargon(value)
299            return (
300                f"[{', '.join(type_)}]" if isinstance(value, list) else cast(str, type_)
301            )
302        return repr(value)
303
304    def _inline_attrs(self, schema: dict, path: Sequence[str]) -> Iterator[str]:
305        for key, value in schema.items():
306            child_path = [*path, key]
307            yield f"{self._label(child_path)}: {self._value(value, child_path)}"
308
309    def _child_prefix(self, parent_prefix: str, child_prefix: str) -> str:
310        return len(parent_prefix) * " " + child_prefix
311
312
313def _separate_terms(word: str) -> List[str]:
314    """
315    >>> _separate_terms("FooBar-foo")
316    "foo bar foo"
317    """
318    return [w.lower() for w in _CAMEL_CASE_SPLITTER.split(word) if w]
319