• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Sensor schema validation tooling."""
15
16from collections.abc import Sequence
17import importlib
18import logging
19from pathlib import Path
20
21import jsonschema  # type: ignore
22import jsonschema.exceptions  # type: ignore
23import yaml
24
25_METADATA_SCHEMA = yaml.safe_load(
26    importlib.resources.read_text("pw_sensor", "metadata_schema.json")
27)
28
29_DEPENDENCY_SCHEMA = yaml.safe_load(
30    importlib.resources.read_text("pw_sensor", "dependency_schema.json")
31)
32
33_RESOLVED_SCHEMA = yaml.safe_load(
34    importlib.resources.read_text("pw_sensor", "resolved_schema.json")
35)
36
37
38class Validator:
39    """
40    Context used for validating metadata dictionaries.
41
42    What the validator is:
43    - A system to resolve and verify that declared sensor metadata is well
44      defined and formatted
45    - A utility to resolve any and all dependencies when using a specified
46      metadata file.
47
48    What the validator is NOT:
49    - Code generator
50    """
51
52    def __init__(
53        self,
54        include_paths: Sequence[Path] | None = None,
55        log_level: int = logging.WARNING,
56    ) -> None:
57        """
58        Construct a Validator with some context of the current run.
59
60        Args:
61          include_paths: An optional list of directories in which to resolve
62            dependencies
63          log_level: A desired logging level (defaults to logging.WARNING)
64        """
65        self._include_paths = include_paths if include_paths else []
66        self._logger = logging.getLogger(self.__class__.__name__)
67        self._logger.setLevel(log_level)
68
69    def validate(self, metadata: dict) -> dict:
70        """
71        Accept a structured metadata description. This dictionary should first
72        pass the schema provided in metadata_schema.yaml. Then, every channel
73        used by the sensor should be defined in exactly one of the dependencies.
74        Example YAML:
75
76          deps:
77            - "pw_sensor/channels.yaml"
78          compatible:
79            org: "Bosch"
80            part: "BMA4xx
81          channels:
82            acceleration: {}
83            die_temperature: {}
84
85        Args:
86          metadata: Structured sensor data, this will NOT be modified
87
88        Returns:
89          A set of channels and a single sensor which match the schema in
90          resolved_schema.json.
91
92        Raises:
93          RuntimeError: An error in the schema validation or a missing
94            definition.
95          FileNotFoundError: One of the dependencies was not found.
96        """
97        result: dict = {
98            "attributes": {},
99            "channels": {},
100            "triggers": {},
101            "sensors": {},
102        }
103        metadata = metadata.copy()
104        try:
105            jsonschema.validate(instance=metadata, schema=_METADATA_SCHEMA)
106        except jsonschema.exceptions.ValidationError as e:
107            raise RuntimeError(
108                "ERROR: Malformed sensor metadata YAML:\n"
109                f"{yaml.safe_dump(metadata, indent=2)}"
110            ) from e
111
112        # Resolve all the dependencies, after this, 'resolved' will have a
113        # list of channel and attribute specifiers
114        self._resolve_dependencies(metadata=metadata, out=result)
115
116        self._logger.debug(yaml.safe_dump(result, indent=2))
117
118        # Resolve all channel entries
119        self._resolve_channels(metadata=metadata, out=result)
120
121        # Resolve all attribute entries
122        self._resolve_attributes(metadata=metadata, out=result)
123
124        # Resolve all trigger entries
125        self._resolve_triggers(metadata=metadata, out=result)
126
127        compatible = metadata.pop("compatible")
128        channels = metadata.pop("channels")
129        attributes = metadata.pop("attributes")
130        triggers = metadata.pop("triggers")
131        result["sensors"][f"{compatible['org']},{compatible['part']}"] = {
132            "compatible": compatible,
133            "channels": channels,
134            "attributes": attributes,
135            "triggers": triggers,
136        }
137
138        try:
139            jsonschema.validate(instance=result, schema=_RESOLVED_SCHEMA)
140        except jsonschema.exceptions.ValidationError as e:
141            raise RuntimeError(
142                "ERROR: Malformed output YAML: "
143                f"{yaml.safe_dump(result, indent=2)}"
144            ) from e
145
146        return result
147
148    def _resolve_dependencies(self, metadata: dict, out: dict) -> None:
149        """
150        Given a list of dependencies, ensure that each of them exists and
151        matches the schema provided in dependency_schema.yaml. Once loaded, the
152        content of the definition file will be resolved (filling in any missing
153        fields that can be inherited) and the final result will be placed in the
154        'out' dictionary.
155
156        Args:
157          metadata: The full sensor metadata passed to the validate function
158          out: Output dictionary where the resolved dependencies should be
159            stored
160
161        Raises:
162          RuntimeError: An error in the schema validation or a missing
163            definition.
164          FileNotFoundError: One of the dependencies was not found.
165        """
166        deps: None | list[str] = metadata.get("deps")
167        if not deps:
168            self._logger.debug("No dependencies found, skipping imports")
169            out["channels"] = {}
170            out["attributes"] = {}
171            out["triggers"] = {}
172            return
173
174        for dep in deps:
175            dep_file = self._get_dependency_file(dep)
176            with open(dep_file, mode="r", encoding="utf-8") as dep_yaml_file:
177                dep_yaml = yaml.safe_load(dep_yaml_file)
178                try:
179                    jsonschema.validate(
180                        instance=dep_yaml, schema=_DEPENDENCY_SCHEMA
181                    )
182                except jsonschema.exceptions.ValidationError as e:
183                    raise RuntimeError(
184                        "ERROR: Malformed dependency YAML: "
185                        f"{yaml.safe_dump(dep_yaml, indent=2)}"
186                    ) from e
187                self._backfill_declarations(declarations=dep_yaml, out=out)
188
189    def _backfill_declarations(self, declarations: dict, out: dict) -> None:
190        """
191        Add any missing properties of a declaration object.
192
193        Args:
194          declarations: The top level declarations dictionary loaded from the
195            dependency file.
196        """
197        self._backfill_channels(declarations=declarations, out=out)
198        self._backfill_attributes(declarations=declarations, out=out)
199        self._backfill_triggers(declarations=declarations, out=out)
200
201    @staticmethod
202    def _backfill_attributes(declarations: dict, out: dict) -> None:
203        """
204        Move attributes from 'delcarations' to 'out' while also filling in any
205        default values.
206
207        Args:
208          declarations: The original YAML declaring attributes.
209          out: Output dictionary where we'll add the key "attributes" with the
210            result.
211        """
212        if out.get("attributes") is None:
213            out["attributes"] = {}
214        resolved_attributes: dict = out["attributes"]
215        if not declarations.get("attributes"):
216            return
217
218        for attr_id, attribute in declarations["attributes"].items():
219            assert resolved_attributes.get(attr_id) is None
220            resolved_attributes[attr_id] = attribute
221            if not attribute.get("name"):
222                attribute["name"] = attr_id
223            if not attribute.get("description"):
224                attribute["description"] = ""
225            if not attribute["units"].get("name"):
226                attribute["units"]["name"] = attribute["units"]["symbol"]
227
228    @staticmethod
229    def _backfill_channels(declarations: dict, out: dict) -> None:
230        """
231        Move channels from 'declarations' to 'out' while also filling in any
232        default values.
233
234        Args:
235          declarations: The original YAML declaring channels.
236          out: Output dictionary where we'll add the key "channels" with the
237            result.
238        """
239        if out.get("channels") is None:
240            out["channels"] = {}
241        resolved_channels: dict = out["channels"]
242        if not declarations.get("channels"):
243            return
244
245        for chan_id, channel in declarations["channels"].items():
246            assert resolved_channels.get(chan_id) is None
247            resolved_channels[chan_id] = channel
248            if not channel.get("name"):
249                channel["name"] = chan_id
250            if not channel.get("description"):
251                channel["description"] = ""
252            units = channel["units"]
253            if not units.get("name"):
254                units["name"] = units["symbol"]
255            # Resolve sub-channels
256            for sub, sub_channel in channel.get("sub-channels", {}).items():
257                subchan_id = f"{chan_id}_{sub}"
258                if sub_channel.get("name") is None:
259                    sub_channel["name"] = subchan_id
260                if sub_channel.get("description") is None:
261                    sub_channel["description"] = channel.get("description")
262                sub_channel["units"] = channel["units"]
263                resolved_channels[subchan_id] = sub_channel
264            channel.pop("sub-channels", None)
265
266    @staticmethod
267    def _backfill_triggers(declarations: dict, out: dict) -> None:
268        """
269        Move triggers from 'delcarations' to 'out' while also filling in any
270        default values.
271
272        Args:
273          declarations: The original YAML declaring triggers.
274          out: Output dictionary where we'll add the key "triggers" with the
275            result.
276        """
277        if out.get("triggers") is None:
278            out["triggers"] = {}
279        resolved_triggers: dict = out["triggers"]
280        if not declarations.get("triggers"):
281            return
282
283        for trigger_id, trigger in declarations["triggers"].items():
284            assert resolved_triggers.get(trigger_id) is None
285            resolved_triggers[trigger_id] = trigger
286            if not trigger.get("name"):
287                trigger["name"] = trigger_id
288            if not trigger.get("description"):
289                trigger["description"] = ""
290
291    def _resolve_attributes(self, metadata: dict, out: dict) -> None:
292        """
293        For each attribute in the metadta, find the matching definition in the
294        'out/attributes' entry and use the data to fill any missing information.
295        For example, if an entry exists that looks like:
296            sample_rate: {}
297
298        We would then try and find the 'sample_rate' key in the out/attributes
299        list (which was already validated by _resolve_dependencies). Since the
300        example above does not override any fields, we would copy the 'name',
301        'description', and 'units' from the definition into the attribute entry.
302
303        Args:
304          metadata: The full sensor metadata passed to the validate function
305          out: The current output, used to get channel definitions
306
307        Raises:
308          RuntimeError: An error in the schema validation or a missing
309            definition.
310        """
311        attributes: dict | None = metadata.get("attributes")
312        if not attributes:
313            metadata["attributes"] = {}
314            self._logger.debug("No attributes found, skipping")
315            return
316
317        for attribute_name, attribute_value in attributes.items():
318            # Check if the attribute_name exists in 'out/attributes', we can
319            # assume 'out/attributes' exists because _resolve_dependencies() is
320            # required to have been called first.
321            attribute = self._check_scalar_name(
322                name=attribute_name,
323                haystack=out["attributes"],
324                overrides=attribute_value,
325            )
326            # The content of 'attribute' came from the 'out/attributes' list
327            # which was already validated and every field added if missing. At
328            # this point it's safe to access the attribute's name, description,
329            # and units.
330            attribute_value["name"] = attribute["name"]
331            attribute_value["description"] = attribute["description"]
332            attribute_value["units"] = attribute["units"]
333
334    def _resolve_channels(self, metadata: dict, out: dict) -> None:
335        """
336        For each channel in the metadata, find the matching definition in the
337        'out/channels' entry and use the data to fill any missing information.
338        For example, if an entry exists that looks like:
339            acceleration: {}
340
341        We would then try and find the 'acceleration' key in the out/channels
342        dict (which was already validated by _resolve_dependencies). Since the
343        example above does not override any fields, we would copy the 'name',
344        'description', and 'units' from the definition into the channel entry.
345
346        Args:
347          metadata: The full sensor metadata passed to the validate function
348          out: The current output, used to get channel definitions
349
350        Raises:
351          RuntimeError: An error in the schema validation or a missing
352            definition.
353        """
354        channels: dict | None = metadata.get("channels")
355        if not channels:
356            self._logger.debug("No channels found, skipping")
357            metadata["channels"] = {}
358            return
359
360        for channel_name, channel_values in channels.items():
361            # Check if the channel_name exists in 'out/channels', we can assume
362            # 'out/channels' exists because _resolve_dependencies() is required
363            # to have been called first.
364            channel = self._check_scalar_name(
365                name=channel_name,
366                haystack=out["channels"],
367                overrides=channel_values,
368            )
369            # The content of 'channel' came from the 'out/channels' dict which
370            # was already validated and every field added if missing. At this
371            # point it's safe to access the channel's name, description, and
372            # units.
373            channel_values["name"] = channel["name"]
374            channel_values["description"] = channel["description"]
375            channel_values["units"] = channel["units"]
376
377            if not channel_values.get("indicies"):
378                channel_values["indicies"] = [{}]
379            for index in channel_values["indicies"]:
380                if not index.get("name"):
381                    index["name"] = channel_values["name"]
382                if not index.get("description"):
383                    index["description"] = channel_values["description"]
384
385    def _resolve_triggers(self, metadata: dict, out: dict) -> None:
386        """
387        For each trigger in the metadata, find the matching definition in the
388        'out/triggers' entry and use the data to fill any missing information.
389        For example, if an entry exists that looks like:
390            data_ready: {}
391
392        We would then try and find the 'data_ready' key in the out/triggers
393        dict (which was already validated by _resolve_dependencies). Since the
394        example above does not override any fields, we would copy the 'name' and
395        'description' from the definition into the trigger entry.
396
397        Args:
398          metadata: The full sensor metadata passed to the validate function
399          out: The current output, used to get trigger definitions
400
401        Raises:
402          RuntimeError: An error in the schema validation or a missing
403            definition.
404        """
405        triggers: dict | None = metadata.get("triggers")
406        if not triggers:
407            metadata["triggers"] = {}
408            self._logger.debug("No triggers found, skipping")
409            return
410
411        for trigger_name, trigger_value in triggers.items():
412            # Check if the trigger_name exists in 'out/triggers', we can
413            # assume 'out/triggers' exists because _resolve_dependencies() is
414            # required to have been called first.
415            trigger = self._check_scalar_name(
416                name=trigger_name,
417                haystack=out["triggers"],
418                overrides=trigger_value,
419            )
420            # The content of 'trigger' came from the 'out/triggers' dict
421            # which was already validated and every field added if missing. At
422            # this point it's safe to access the trigger's name and description.
423            trigger_value["name"] = trigger["name"]
424            trigger_value["description"] = trigger["description"]
425
426    def _get_dependency_file(self, dep: str) -> Path:
427        """
428        Search for a dependency file and return the full path to it if found.
429
430        Args:
431          dep: The dependency string as provided by the metadata yaml.
432
433        Returns:
434          The dependency file as a Path object if found.
435
436        Raises:
437          FileNotFoundError: One of the dependencies was not found.
438        """
439        error_string = f"Failed to find {dep} using search paths:"
440        # Check if a full path was used
441        if Path(dep).is_file():
442            return Path(dep)
443
444        # Search all the include paths
445        for path in self._include_paths:
446            if (path / dep).is_file():
447                return path / dep
448            error_string += f"\n- {path}"
449
450        raise FileNotFoundError(error_string)
451
452    @staticmethod
453    def _check_scalar_name(name: str, haystack: dict, overrides: dict) -> dict:
454        """
455        Given a name and the resolved list of dependencies, try to find
456        the full definition of a scalar (channel or attribute) with the name,
457        description, and units OR trigger with just a name/description.
458
459        We rely on the schema to ensure that channels and attributes have units
460        so if we can't find units, then we must be looking for a trigger.
461
462        Args:
463          name: The name of the channel/attribute/trigger to search for in the
464            dependencies
465          haystack: The dictionary of resolved properties which define the
466            available channels/attributes/triggers
467
468        Returns:
469          A dictionary with the following structure:
470            name: string
471            description: string
472            (optional)
473            units:
474              name: string
475              symbol: string
476
477        Raises:
478          RuntimeError: If the 'name' isn't in the dependency list
479        """
480        # Check if we can find 'name' in the 'haystack' dictionary
481        if not haystack.get(name):
482            raise RuntimeError(
483                f"Failed to find a definition for '{name}', did you forget a "
484                "dependency?"
485            )
486
487        item = haystack[name]
488        name = overrides.get("name", item.get("name", name))
489        description = overrides.get("description", item.get("description", ""))
490        if item.get("units") is None:
491            return {
492                "name": name,
493                "description": description,
494            }
495
496        units = {
497            "name": item["units"].get("name", item["units"]["symbol"]),
498            "symbol": item["units"]["symbol"],
499        }
500        return {
501            "name": name,
502            "description": description,
503            "units": units,
504        }
505