• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Sensor schema validation tooling."""
15
16from collections.abc import Sequence
17import importlib.resources
18import logging
19from pathlib import Path
20import re
21
22import jsonschema  # type: ignore
23import jsonschema.exceptions  # type: ignore
24import yaml
25
26_METADATA_SCHEMA = yaml.safe_load(
27    importlib.resources.read_text("pw_sensor", "metadata_schema.json")
28)
29
30_DEPENDENCY_SCHEMA = yaml.safe_load(
31    importlib.resources.read_text("pw_sensor", "dependency_schema.json")
32)
33
34_RESOLVED_SCHEMA = yaml.safe_load(
35    importlib.resources.read_text("pw_sensor", "resolved_schema.json")
36)
37
38
39class Validator:
40    """
41    Context used for validating metadata dictionaries.
42
43    What the validator is:
44    - A system to resolve and verify that declared sensor metadata is well
45      defined and formatted
46    - A utility to resolve any and all dependencies when using a specified
47      metadata file.
48
49    What the validator is NOT:
50    - Code generator
51    """
52
53    def __init__(
54        self,
55        include_paths: Sequence[Path] | None = None,
56        log_level: int = logging.WARNING,
57    ) -> None:
58        """
59        Construct a Validator with some context of the current run.
60
61        Args:
62          include_paths: An optional list of directories in which to resolve
63            dependencies
64          log_level: A desired logging level (defaults to logging.WARNING)
65        """
66        self._include_paths = include_paths if include_paths else []
67        self._logger = logging.getLogger(self.__class__.__name__)
68        self._logger.setLevel(log_level)
69
70    def validate(self, metadata: dict) -> dict:
71        """
72        Accept a structured metadata description. This dictionary should first
73        pass the schema provided in metadata_schema.yaml. Then, every channel
74        used by the sensor should be defined in exactly one of the dependencies.
75        Example YAML:
76
77          deps:
78            - "pw_sensor/channels.yaml"
79          compatible:
80            org: "Bosch"
81            part: "BMA4xx
82          supported-buses:
83            - i2c
84          channels:
85            acceleration: []
86            die_temperature: []
87
88        Args:
89          metadata: Structured sensor data, this will NOT be modified
90
91        Returns:
92          A set of attributes, channels, triggers, and units along with a single
93          sensor which match the schema in resolved_schema.json.
94
95        Raises:
96          RuntimeError: An error in the schema validation or a missing
97            definition.
98          FileNotFoundError: One of the dependencies was not found.
99        """
100        result: dict = {
101            "attributes": {},
102            "channels": {},
103            "triggers": {},
104            "units": {},
105            "sensors": {},
106        }
107        metadata = metadata.copy()
108
109        # Validate the incoming schema
110        try:
111            jsonschema.validate(instance=metadata, schema=_METADATA_SCHEMA)
112        except jsonschema.exceptions.ValidationError as e:
113            raise RuntimeError(
114                "ERROR: Malformed sensor metadata YAML:\n"
115                f"{yaml.safe_dump(metadata, indent=2)}"
116            ) from e
117
118        # Resolve all the dependencies, after this, 'result' will have all the
119        # missing properties for which defaults can be provided
120        self._resolve_dependencies(metadata=metadata, out=result)
121
122        self._logger.debug(
123            "Resolved dependencies:\n%s", yaml.safe_dump(result, indent=2)
124        )
125
126        # Resolve all channel entries (must be done before attributes)
127        self._resolve_channels(metadata=metadata, out=result)
128
129        # Resolve all trigger entries (must be done before attributes)
130        self._resolve_triggers(metadata=metadata, out=result)
131
132        # Resolve all attribute entries
133        self._resolve_attributes(metadata=metadata, out=result)
134
135        compatible, compatible_str = Validator._get_compatible_string_and_dict(
136            metadata.pop("compatible")
137        )
138        supported_buses = metadata.pop("supported-buses")
139        channels = metadata.pop("channels")
140        attributes = metadata.pop("attributes")
141        triggers = metadata.pop("triggers")
142
143        result["sensors"][compatible_str] = {
144            "compatible": compatible,
145            "supported-buses": self._normalize_supported_buses(supported_buses),
146            "channels": channels,
147            "attributes": attributes,
148            "triggers": triggers,
149            "description": metadata.get("description", ""),
150            "extras": metadata.get("extras", {}),
151        }
152
153        # Validate the final output before returning
154        try:
155            jsonschema.validate(instance=result, schema=_RESOLVED_SCHEMA)
156        except jsonschema.exceptions.ValidationError as e:
157            msg = (
158                "ERROR: Malformed output YAML: "
159                f"{yaml.safe_dump(result, indent=2)}"
160            )
161            raise RuntimeError(msg) from e
162
163        return result
164
165    @staticmethod
166    def _normalize_supported_buses(buses: list[str]) -> list[str]:
167        """Resolve a list of supported buses
168
169        Each bus string will be converted to lowercase and all sequential
170        whitespace & '-' characters will be replaced by a single '_'.
171
172        Args:
173            buses: A list of the supported sensor buses
174
175        Returns:
176            Normalized list of buses
177
178        """
179        filtered_list = list(
180            {re.sub(r"[\s\-]+", "_", s.lower()) for s in buses}
181        )
182        if len(buses) != len(filtered_list):
183            error = (
184                "ERROR: bus list contains duplicates when converted to "
185                f"lowercase and concatenated with '_': {sorted(buses)} -> "
186                f"{sorted(filtered_list)}"
187            )
188            raise RuntimeError(error)
189        return filtered_list
190
191    @staticmethod
192    def _get_compatible_string_and_dict(
193        compatible: dict[str, str],
194    ) -> tuple[dict[str, str], str]:
195        """
196        Normalize compatible info
197
198        This function processes a 'compatible' dictionary with a 'part' key and
199        an optional 'org' key. It returns a new dictionary with the 'org' key
200        removed if it was empty or missing, and a formatted string based on the
201        'org' key's presence and value.
202
203        Args:
204            compatible (dict[str, str]): A dictionary with a 'part' key and an
205            optional 'org' key.
206
207        Returns:
208            Tuple[dict[str, str], str]: A tuple containing:
209            - A new dictionary with the 'org' key removed if it was empty or
210              missing.
211            - A formatted string:
212              - "{org},{part}" if 'org' exists and is not empty (after trimming)
213              - "part" otherwise.
214
215        """
216        part = compatible["part"].lower()
217        org = compatible.get("org", "").strip().lower()
218
219        new_compatible = {"part": part}
220        if org:
221            new_compatible["org"] = org
222            return new_compatible, f"{org},{part}"
223        return new_compatible, part
224
225    def _resolve_dependencies(self, metadata: dict, out: dict) -> None:
226        """
227        Given a list of dependencies, ensure that each of them exists and
228        matches the schema provided in dependency_schema.yaml. Once loaded, the
229        content of the definition file will be resolved (filling in any missing
230        fields that can be inherited) and the final result will be placed in the
231        'out' dictionary.
232
233        Args:
234          metadata: The full sensor metadata passed to the validate function
235          out: Output dictionary where the resolved dependencies should be
236            stored
237
238        Raises:
239          RuntimeError: An error in the schema validation or a missing
240            definition.
241          FileNotFoundError: One of the dependencies was not found.
242        """
243        deps: None | list[str] = metadata.get("deps")
244        if not deps:
245            self._logger.debug("No dependencies found, skipping imports")
246            return
247
248        merged_deps: dict = {
249            "attributes": {},
250            "channels": {},
251            "triggers": {},
252            "units": {},
253        }
254        for dep in deps:
255            # Load each of the dependencies, then merge them. This avoids any
256            # include dependency order issues.
257            dep_file = self._get_dependency_file(dep)
258            with open(dep_file, mode="r", encoding="utf-8") as dep_yaml_file:
259                dep_yaml = yaml.safe_load(dep_yaml_file)
260                try:
261                    jsonschema.validate(
262                        instance=dep_yaml, schema=_DEPENDENCY_SCHEMA
263                    )
264                except jsonschema.exceptions.ValidationError as e:
265                    raise RuntimeError(
266                        "ERROR: Malformed dependency YAML: "
267                        f"{yaml.safe_dump(dep_yaml, indent=2)}"
268                    ) from e
269                # Merge all the loaded values into 'merged_deps'
270                for category in merged_deps:
271                    self._merge_deps(
272                        category=category,
273                        dep_yaml=dep_yaml,
274                        merged_deps=merged_deps,
275                    )
276        # Backfill any default values from the merged dependencies and put them
277        # into 'out'
278        self._backfill_declarations(declarations=merged_deps, out=out)
279
280    @staticmethod
281    def _merge_deps(category: str, dep_yaml: dict, merged_deps: dict) -> None:
282        """
283        Pull all properties from dep_yaml[category] and put them into
284        merged_deps after validating that no key duplicates exist.
285
286        Args:
287          category: The index of dep_yaml and merged_deps to merge
288          dep_yaml: The newly loaded dependency YAML
289          merged_deps: The accumulated dependency map
290        """
291        for key, value in dep_yaml.get(category, {}).items():
292            assert (
293                key not in merged_deps[category]
294            ), f"'{key}' was already found under '{category}'"
295            merged_deps[category][key] = value
296
297    def _backfill_declarations(self, declarations: dict, out: dict) -> None:
298        """
299        Add any missing properties of a declaration object.
300
301        Args:
302          declarations: The top level declarations dictionary loaded from the
303            dependency file.
304          out: The already resolved map of all defined dependencies
305        """
306        self._backfill_units(declarations=declarations, out=out)
307        self._backfill_channels(declarations=declarations, out=out)
308        self._backfill_attributes(declarations=declarations, out=out)
309        self._backfill_triggers(declarations=declarations, out=out)
310
311    @staticmethod
312    def _backfill_units(declarations: dict, out: dict) -> None:
313        """
314        Move units from 'declarations' to 'out' while also filling in any
315        default values.
316
317        Args:
318          declarations: The original YAML declaring units.
319          out: Output dictionary where we'll add the key "units" wit the result.
320        """
321        if out.get("units") is None:
322            out["units"] = {}
323        resolved_units: dict = out["units"]
324        if not declarations.get("units"):
325            return
326
327        for units_id, unit in declarations["units"].items():
328            # Copy unit to resolved_units and fill any default values
329            assert resolved_units.get(units_id) is None
330            resolved_units[units_id] = unit
331            if not unit.get("name"):
332                unit["name"] = unit["symbol"]
333            if unit.get("description") is None:
334                unit["description"] = ""
335
336    @staticmethod
337    def _backfill_attributes(declarations: dict, out: dict) -> None:
338        """
339        Move attributes from 'delcarations' to 'out' while also filling in any
340        default values.
341
342        Args:
343          declarations: The original YAML declaring attributes.
344          out: Output dictionary where we'll add the key "attributes" with the
345            result.
346        """
347        if out.get("attributes") is None:
348            out["attributes"] = {}
349        resolved_attributes: dict = out["attributes"]
350        if not declarations.get("attributes"):
351            return
352
353        for attr_id, attribute in declarations["attributes"].items():
354            # Copy attribute to resolved_attributes and fill any default values
355            assert resolved_attributes.get(attr_id) is None
356            resolved_attributes[attr_id] = attribute
357            if not attribute.get("name"):
358                attribute["name"] = attr_id
359            if not attribute.get("description"):
360                attribute["description"] = ""
361
362    @staticmethod
363    def _backfill_channels(declarations: dict, out: dict) -> None:
364        """
365        Move channels from 'declarations' to 'out' while also filling in any
366        default values.
367
368        Args:
369          declarations: The original YAML declaring channels.
370          out: Output dictionary where we'll add the key "channels" with the
371            result.
372        """
373        if out.get("channels") is None:
374            out["channels"] = {}
375        resolved_channels: dict = out["channels"]
376        if not declarations.get("channels"):
377            return
378
379        for chan_id, channel in declarations["channels"].items():
380            # Copy channel to resolved_channels and fill any default values
381            assert resolved_channels.get(chan_id) is None
382            resolved_channels[chan_id] = channel
383            if not channel.get("name"):
384                channel["name"] = chan_id
385            if not channel.get("description"):
386                channel["description"] = ""
387            assert channel["units"] in out["units"], (
388                f"'{channel['units']}' not found in\n"
389                + f"{yaml.safe_dump(out.get('units', {}), indent=2)}"
390            )
391
392    @staticmethod
393    def _backfill_triggers(declarations: dict, out: dict) -> None:
394        """
395        Move triggers from 'delcarations' to 'out' while also filling in any
396        default values.
397
398        Args:
399          declarations: The original YAML declaring triggers.
400          out: Output dictionary where we'll add the key "triggers" with the
401            result.
402        """
403        if out.get("triggers") is None:
404            out["triggers"] = {}
405        resolved_triggers: dict = out["triggers"]
406        if not declarations.get("triggers"):
407            return
408
409        for trigger_id, trigger in declarations["triggers"].items():
410            # Copy trigger to resolved_triggers and fill any default values
411            assert resolved_triggers.get(trigger_id) is None
412            resolved_triggers[trigger_id] = trigger
413            if not trigger.get("name"):
414                trigger["name"] = trigger_id
415            if not trigger.get("description"):
416                trigger["description"] = ""
417
418    def _resolve_attributes(self, metadata: dict, out: dict) -> None:
419        """Resolve and validate any default values in Attributes
420
421        For each attribute in the metadta, find the matching definition in the
422        'out/attributes' entry and use the data to fill any missing information.
423        For example, if an entry exists that looks like:
424            sample_rate: {}
425
426        We would then try and find the 'sample_rate' key in the out/attributes
427        list (which was already validated by _resolve_dependencies). Since the
428        example above does not override any fields, we would copy the 'name',
429        'description', and 'units' from the definition into the attribute entry.
430
431        Args:
432          metadata: The full sensor metadata passed to the validate function
433          out: The current output, used to get channel definitions
434
435        Raises:
436          RuntimeError: An error in the schema validation or a missing
437            definition.
438
439        """
440        attributes: list | None = metadata.get("attributes")
441        if not attributes:
442            metadata["attributes"] = []
443            self._logger.debug("No attributes found, skipping")
444            return
445
446        attribute: dict
447        for attribute in attributes:
448            assert attribute["attribute"] in out["attributes"]
449            assert attribute["units"] in out["units"]
450
451            has_channel_name = "channel" in attribute
452            has_trigger_name = "trigger" in attribute
453
454            if has_channel_name and has_trigger_name:
455                error = (
456                    "Attribute instances cannot specify both channel AND "
457                    f"trigger:\n{yaml.safe_dump(attribute, indent=2)}"
458                )
459                raise RuntimeError(error)
460            if has_channel_name:
461                assert attribute["channel"] in out["channels"]
462            if has_trigger_name:
463                assert attribute["trigger"] in out["triggers"]
464
465    def _resolve_channels(self, metadata: dict, out: dict) -> None:
466        """
467        For each channel in the metadata, find the matching definition in the
468        'out/channels' entry and use the data to fill any missing information.
469        For example, if an entry exists that looks like:
470            acceleration: {}
471
472        We would then try and find the 'acceleration' key in the out/channels
473        dict (which was already validated by _resolve_dependencies). Since the
474        example above does not override any fields, we would copy the 'name',
475        'description', and 'units' from the definition into the channel entry.
476
477        Args:
478          metadata: The full sensor metadata passed to the validate function
479          out: The current output, used to get channel definitions
480
481        Raises:
482          RuntimeError: An error in the schema validation or a missing
483            definition.
484        """
485        channels: dict | None = metadata.get("channels")
486        if not channels:
487            self._logger.debug("No channels found, skipping")
488            metadata["channels"] = {}
489            return
490
491        channel_name: str
492        indices: list[dict]
493        for channel_name, indices in channels.items():
494            # channel_name must have been resolved by now.
495            if out["channels"].get(channel_name) is None:
496                raise RuntimeError(
497                    f"Failed to find a definition for '{channel_name}', did you"
498                    " forget a dependency?"
499                )
500            channel = out["channels"][channel_name]
501            # The content of 'channel' came from the 'out/channels' dict which
502            # was already validated and every field added if missing. At this
503            # point it's safe to access the channel's name, description, and
504            # units.
505
506            if not indices:
507                indices.append({})
508
509            index: dict
510            for index in indices:
511                if not index.get("name"):
512                    index["name"] = channel["name"]
513                if not index.get("description"):
514                    index["description"] = channel["description"]
515                # Always use the same units
516                index["units"] = channel["units"]
517
518    def _resolve_triggers(self, metadata: dict, out: dict) -> None:
519        """
520        For each trigger in the metadata, find the matching definition in the
521        'out/triggers' entry and use the data to fill any missing information.
522        For example, if an entry exists that looks like:
523            data_ready: {}
524
525        We would then try and find the 'data_ready' key in the out/triggers
526        dict (which was already validated by _resolve_dependencies). Since the
527        example above does not override any fields, we would copy the 'name' and
528        'description' from the definition into the trigger entry.
529
530        Args:
531          metadata: The full sensor metadata passed to the validate function
532          out: The current output, used to get trigger definitions
533
534        Raises:
535          RuntimeError: An error in the schema validation or a missing
536            definition.
537        """
538        triggers: list | None = metadata.get("triggers")
539        if not triggers:
540            metadata["triggers"] = []
541            self._logger.debug("No triggers found, skipping")
542            return
543
544        for trigger_name in triggers:
545            assert trigger_name in out["triggers"]
546
547    def _get_dependency_file(self, dep: str) -> Path:
548        """
549        Search for a dependency file and return the full path to it if found.
550
551        Args:
552          dep: The dependency string as provided by the metadata yaml.
553
554        Returns:
555          The dependency file as a Path object if found.
556
557        Raises:
558          FileNotFoundError: One of the dependencies was not found.
559        """
560        error_string = f"Failed to find {dep} using search paths:"
561        # Check if a full path was used
562        if Path(dep).is_file():
563            return Path(dep)
564
565        # Search all the include paths
566        for path in self._include_paths:
567            if (path / dep).is_file():
568                return path / dep
569            error_string += f"\n- {path}"
570
571        raise FileNotFoundError(error_string)
572