1# Copyright 2024 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Sensor schema validation tooling.""" 15 16from collections.abc import Sequence 17import importlib 18import logging 19from pathlib import Path 20 21import jsonschema # type: ignore 22import jsonschema.exceptions # type: ignore 23import yaml 24 25_METADATA_SCHEMA = yaml.safe_load( 26 importlib.resources.read_text("pw_sensor", "metadata_schema.json") 27) 28 29_DEPENDENCY_SCHEMA = yaml.safe_load( 30 importlib.resources.read_text("pw_sensor", "dependency_schema.json") 31) 32 33_RESOLVED_SCHEMA = yaml.safe_load( 34 importlib.resources.read_text("pw_sensor", "resolved_schema.json") 35) 36 37 38class Validator: 39 """ 40 Context used for validating metadata dictionaries. 41 42 What the validator is: 43 - A system to resolve and verify that declared sensor metadata is well 44 defined and formatted 45 - A utility to resolve any and all dependencies when using a specified 46 metadata file. 47 48 What the validator is NOT: 49 - Code generator 50 """ 51 52 def __init__( 53 self, 54 include_paths: Sequence[Path] | None = None, 55 log_level: int = logging.WARNING, 56 ) -> None: 57 """ 58 Construct a Validator with some context of the current run. 59 60 Args: 61 include_paths: An optional list of directories in which to resolve 62 dependencies 63 log_level: A desired logging level (defaults to logging.WARNING) 64 """ 65 self._include_paths = include_paths if include_paths else [] 66 self._logger = logging.getLogger(self.__class__.__name__) 67 self._logger.setLevel(log_level) 68 69 def validate(self, metadata: dict) -> dict: 70 """ 71 Accept a structured metadata description. This dictionary should first 72 pass the schema provided in metadata_schema.yaml. Then, every channel 73 used by the sensor should be defined in exactly one of the dependencies. 74 Example YAML: 75 76 deps: 77 - "pw_sensor/channels.yaml" 78 compatible: 79 org: "Bosch" 80 part: "BMA4xx 81 channels: 82 acceleration: {} 83 die_temperature: {} 84 85 Args: 86 metadata: Structured sensor data, this will NOT be modified 87 88 Returns: 89 A set of channels and a single sensor which match the schema in 90 resolved_schema.json. 91 92 Raises: 93 RuntimeError: An error in the schema validation or a missing 94 definition. 95 FileNotFoundError: One of the dependencies was not found. 96 """ 97 result: dict = { 98 "attributes": {}, 99 "channels": {}, 100 "triggers": {}, 101 "sensors": {}, 102 } 103 metadata = metadata.copy() 104 try: 105 jsonschema.validate(instance=metadata, schema=_METADATA_SCHEMA) 106 except jsonschema.exceptions.ValidationError as e: 107 raise RuntimeError( 108 "ERROR: Malformed sensor metadata YAML:\n" 109 f"{yaml.safe_dump(metadata, indent=2)}" 110 ) from e 111 112 # Resolve all the dependencies, after this, 'resolved' will have a 113 # list of channel and attribute specifiers 114 self._resolve_dependencies(metadata=metadata, out=result) 115 116 self._logger.debug(yaml.safe_dump(result, indent=2)) 117 118 # Resolve all channel entries 119 self._resolve_channels(metadata=metadata, out=result) 120 121 # Resolve all attribute entries 122 self._resolve_attributes(metadata=metadata, out=result) 123 124 # Resolve all trigger entries 125 self._resolve_triggers(metadata=metadata, out=result) 126 127 compatible = metadata.pop("compatible") 128 channels = metadata.pop("channels") 129 attributes = metadata.pop("attributes") 130 triggers = metadata.pop("triggers") 131 result["sensors"][f"{compatible['org']},{compatible['part']}"] = { 132 "compatible": compatible, 133 "channels": channels, 134 "attributes": attributes, 135 "triggers": triggers, 136 } 137 138 try: 139 jsonschema.validate(instance=result, schema=_RESOLVED_SCHEMA) 140 except jsonschema.exceptions.ValidationError as e: 141 raise RuntimeError( 142 "ERROR: Malformed output YAML: " 143 f"{yaml.safe_dump(result, indent=2)}" 144 ) from e 145 146 return result 147 148 def _resolve_dependencies(self, metadata: dict, out: dict) -> None: 149 """ 150 Given a list of dependencies, ensure that each of them exists and 151 matches the schema provided in dependency_schema.yaml. Once loaded, the 152 content of the definition file will be resolved (filling in any missing 153 fields that can be inherited) and the final result will be placed in the 154 'out' dictionary. 155 156 Args: 157 metadata: The full sensor metadata passed to the validate function 158 out: Output dictionary where the resolved dependencies should be 159 stored 160 161 Raises: 162 RuntimeError: An error in the schema validation or a missing 163 definition. 164 FileNotFoundError: One of the dependencies was not found. 165 """ 166 deps: None | list[str] = metadata.get("deps") 167 if not deps: 168 self._logger.debug("No dependencies found, skipping imports") 169 out["channels"] = {} 170 out["attributes"] = {} 171 out["triggers"] = {} 172 return 173 174 for dep in deps: 175 dep_file = self._get_dependency_file(dep) 176 with open(dep_file, mode="r", encoding="utf-8") as dep_yaml_file: 177 dep_yaml = yaml.safe_load(dep_yaml_file) 178 try: 179 jsonschema.validate( 180 instance=dep_yaml, schema=_DEPENDENCY_SCHEMA 181 ) 182 except jsonschema.exceptions.ValidationError as e: 183 raise RuntimeError( 184 "ERROR: Malformed dependency YAML: " 185 f"{yaml.safe_dump(dep_yaml, indent=2)}" 186 ) from e 187 self._backfill_declarations(declarations=dep_yaml, out=out) 188 189 def _backfill_declarations(self, declarations: dict, out: dict) -> None: 190 """ 191 Add any missing properties of a declaration object. 192 193 Args: 194 declarations: The top level declarations dictionary loaded from the 195 dependency file. 196 """ 197 self._backfill_channels(declarations=declarations, out=out) 198 self._backfill_attributes(declarations=declarations, out=out) 199 self._backfill_triggers(declarations=declarations, out=out) 200 201 @staticmethod 202 def _backfill_attributes(declarations: dict, out: dict) -> None: 203 """ 204 Move attributes from 'delcarations' to 'out' while also filling in any 205 default values. 206 207 Args: 208 declarations: The original YAML declaring attributes. 209 out: Output dictionary where we'll add the key "attributes" with the 210 result. 211 """ 212 if out.get("attributes") is None: 213 out["attributes"] = {} 214 resolved_attributes: dict = out["attributes"] 215 if not declarations.get("attributes"): 216 return 217 218 for attr_id, attribute in declarations["attributes"].items(): 219 assert resolved_attributes.get(attr_id) is None 220 resolved_attributes[attr_id] = attribute 221 if not attribute.get("name"): 222 attribute["name"] = attr_id 223 if not attribute.get("description"): 224 attribute["description"] = "" 225 if not attribute["units"].get("name"): 226 attribute["units"]["name"] = attribute["units"]["symbol"] 227 228 @staticmethod 229 def _backfill_channels(declarations: dict, out: dict) -> None: 230 """ 231 Move channels from 'declarations' to 'out' while also filling in any 232 default values. 233 234 Args: 235 declarations: The original YAML declaring channels. 236 out: Output dictionary where we'll add the key "channels" with the 237 result. 238 """ 239 if out.get("channels") is None: 240 out["channels"] = {} 241 resolved_channels: dict = out["channels"] 242 if not declarations.get("channels"): 243 return 244 245 for chan_id, channel in declarations["channels"].items(): 246 assert resolved_channels.get(chan_id) is None 247 resolved_channels[chan_id] = channel 248 if not channel.get("name"): 249 channel["name"] = chan_id 250 if not channel.get("description"): 251 channel["description"] = "" 252 units = channel["units"] 253 if not units.get("name"): 254 units["name"] = units["symbol"] 255 # Resolve sub-channels 256 for sub, sub_channel in channel.get("sub-channels", {}).items(): 257 subchan_id = f"{chan_id}_{sub}" 258 if sub_channel.get("name") is None: 259 sub_channel["name"] = subchan_id 260 if sub_channel.get("description") is None: 261 sub_channel["description"] = channel.get("description") 262 sub_channel["units"] = channel["units"] 263 resolved_channels[subchan_id] = sub_channel 264 channel.pop("sub-channels", None) 265 266 @staticmethod 267 def _backfill_triggers(declarations: dict, out: dict) -> None: 268 """ 269 Move triggers from 'delcarations' to 'out' while also filling in any 270 default values. 271 272 Args: 273 declarations: The original YAML declaring triggers. 274 out: Output dictionary where we'll add the key "triggers" with the 275 result. 276 """ 277 if out.get("triggers") is None: 278 out["triggers"] = {} 279 resolved_triggers: dict = out["triggers"] 280 if not declarations.get("triggers"): 281 return 282 283 for trigger_id, trigger in declarations["triggers"].items(): 284 assert resolved_triggers.get(trigger_id) is None 285 resolved_triggers[trigger_id] = trigger 286 if not trigger.get("name"): 287 trigger["name"] = trigger_id 288 if not trigger.get("description"): 289 trigger["description"] = "" 290 291 def _resolve_attributes(self, metadata: dict, out: dict) -> None: 292 """ 293 For each attribute in the metadta, find the matching definition in the 294 'out/attributes' entry and use the data to fill any missing information. 295 For example, if an entry exists that looks like: 296 sample_rate: {} 297 298 We would then try and find the 'sample_rate' key in the out/attributes 299 list (which was already validated by _resolve_dependencies). Since the 300 example above does not override any fields, we would copy the 'name', 301 'description', and 'units' from the definition into the attribute entry. 302 303 Args: 304 metadata: The full sensor metadata passed to the validate function 305 out: The current output, used to get channel definitions 306 307 Raises: 308 RuntimeError: An error in the schema validation or a missing 309 definition. 310 """ 311 attributes: dict | None = metadata.get("attributes") 312 if not attributes: 313 metadata["attributes"] = {} 314 self._logger.debug("No attributes found, skipping") 315 return 316 317 for attribute_name, attribute_value in attributes.items(): 318 # Check if the attribute_name exists in 'out/attributes', we can 319 # assume 'out/attributes' exists because _resolve_dependencies() is 320 # required to have been called first. 321 attribute = self._check_scalar_name( 322 name=attribute_name, 323 haystack=out["attributes"], 324 overrides=attribute_value, 325 ) 326 # The content of 'attribute' came from the 'out/attributes' list 327 # which was already validated and every field added if missing. At 328 # this point it's safe to access the attribute's name, description, 329 # and units. 330 attribute_value["name"] = attribute["name"] 331 attribute_value["description"] = attribute["description"] 332 attribute_value["units"] = attribute["units"] 333 334 def _resolve_channels(self, metadata: dict, out: dict) -> None: 335 """ 336 For each channel in the metadata, find the matching definition in the 337 'out/channels' entry and use the data to fill any missing information. 338 For example, if an entry exists that looks like: 339 acceleration: {} 340 341 We would then try and find the 'acceleration' key in the out/channels 342 dict (which was already validated by _resolve_dependencies). Since the 343 example above does not override any fields, we would copy the 'name', 344 'description', and 'units' from the definition into the channel entry. 345 346 Args: 347 metadata: The full sensor metadata passed to the validate function 348 out: The current output, used to get channel definitions 349 350 Raises: 351 RuntimeError: An error in the schema validation or a missing 352 definition. 353 """ 354 channels: dict | None = metadata.get("channels") 355 if not channels: 356 self._logger.debug("No channels found, skipping") 357 metadata["channels"] = {} 358 return 359 360 for channel_name, channel_values in channels.items(): 361 # Check if the channel_name exists in 'out/channels', we can assume 362 # 'out/channels' exists because _resolve_dependencies() is required 363 # to have been called first. 364 channel = self._check_scalar_name( 365 name=channel_name, 366 haystack=out["channels"], 367 overrides=channel_values, 368 ) 369 # The content of 'channel' came from the 'out/channels' dict which 370 # was already validated and every field added if missing. At this 371 # point it's safe to access the channel's name, description, and 372 # units. 373 channel_values["name"] = channel["name"] 374 channel_values["description"] = channel["description"] 375 channel_values["units"] = channel["units"] 376 377 if not channel_values.get("indicies"): 378 channel_values["indicies"] = [{}] 379 for index in channel_values["indicies"]: 380 if not index.get("name"): 381 index["name"] = channel_values["name"] 382 if not index.get("description"): 383 index["description"] = channel_values["description"] 384 385 def _resolve_triggers(self, metadata: dict, out: dict) -> None: 386 """ 387 For each trigger in the metadata, find the matching definition in the 388 'out/triggers' entry and use the data to fill any missing information. 389 For example, if an entry exists that looks like: 390 data_ready: {} 391 392 We would then try and find the 'data_ready' key in the out/triggers 393 dict (which was already validated by _resolve_dependencies). Since the 394 example above does not override any fields, we would copy the 'name' and 395 'description' from the definition into the trigger entry. 396 397 Args: 398 metadata: The full sensor metadata passed to the validate function 399 out: The current output, used to get trigger definitions 400 401 Raises: 402 RuntimeError: An error in the schema validation or a missing 403 definition. 404 """ 405 triggers: dict | None = metadata.get("triggers") 406 if not triggers: 407 metadata["triggers"] = {} 408 self._logger.debug("No triggers found, skipping") 409 return 410 411 for trigger_name, trigger_value in triggers.items(): 412 # Check if the trigger_name exists in 'out/triggers', we can 413 # assume 'out/triggers' exists because _resolve_dependencies() is 414 # required to have been called first. 415 trigger = self._check_scalar_name( 416 name=trigger_name, 417 haystack=out["triggers"], 418 overrides=trigger_value, 419 ) 420 # The content of 'trigger' came from the 'out/triggers' dict 421 # which was already validated and every field added if missing. At 422 # this point it's safe to access the trigger's name and description. 423 trigger_value["name"] = trigger["name"] 424 trigger_value["description"] = trigger["description"] 425 426 def _get_dependency_file(self, dep: str) -> Path: 427 """ 428 Search for a dependency file and return the full path to it if found. 429 430 Args: 431 dep: The dependency string as provided by the metadata yaml. 432 433 Returns: 434 The dependency file as a Path object if found. 435 436 Raises: 437 FileNotFoundError: One of the dependencies was not found. 438 """ 439 error_string = f"Failed to find {dep} using search paths:" 440 # Check if a full path was used 441 if Path(dep).is_file(): 442 return Path(dep) 443 444 # Search all the include paths 445 for path in self._include_paths: 446 if (path / dep).is_file(): 447 return path / dep 448 error_string += f"\n- {path}" 449 450 raise FileNotFoundError(error_string) 451 452 @staticmethod 453 def _check_scalar_name(name: str, haystack: dict, overrides: dict) -> dict: 454 """ 455 Given a name and the resolved list of dependencies, try to find 456 the full definition of a scalar (channel or attribute) with the name, 457 description, and units OR trigger with just a name/description. 458 459 We rely on the schema to ensure that channels and attributes have units 460 so if we can't find units, then we must be looking for a trigger. 461 462 Args: 463 name: The name of the channel/attribute/trigger to search for in the 464 dependencies 465 haystack: The dictionary of resolved properties which define the 466 available channels/attributes/triggers 467 468 Returns: 469 A dictionary with the following structure: 470 name: string 471 description: string 472 (optional) 473 units: 474 name: string 475 symbol: string 476 477 Raises: 478 RuntimeError: If the 'name' isn't in the dependency list 479 """ 480 # Check if we can find 'name' in the 'haystack' dictionary 481 if not haystack.get(name): 482 raise RuntimeError( 483 f"Failed to find a definition for '{name}', did you forget a " 484 "dependency?" 485 ) 486 487 item = haystack[name] 488 name = overrides.get("name", item.get("name", name)) 489 description = overrides.get("description", item.get("description", "")) 490 if item.get("units") is None: 491 return { 492 "name": name, 493 "description": description, 494 } 495 496 units = { 497 "name": item["units"].get("name", item["units"]["symbol"]), 498 "symbol": item["units"]["symbol"], 499 } 500 return { 501 "name": name, 502 "description": description, 503 "units": units, 504 } 505