1# Copyright 2024 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Sensor schema validation tooling.""" 15 16from collections.abc import Sequence 17import importlib.resources 18import logging 19from pathlib import Path 20import re 21 22import jsonschema # type: ignore 23import jsonschema.exceptions # type: ignore 24import yaml 25 26_METADATA_SCHEMA = yaml.safe_load( 27 importlib.resources.read_text("pw_sensor", "metadata_schema.json") 28) 29 30_DEPENDENCY_SCHEMA = yaml.safe_load( 31 importlib.resources.read_text("pw_sensor", "dependency_schema.json") 32) 33 34_RESOLVED_SCHEMA = yaml.safe_load( 35 importlib.resources.read_text("pw_sensor", "resolved_schema.json") 36) 37 38 39class Validator: 40 """ 41 Context used for validating metadata dictionaries. 42 43 What the validator is: 44 - A system to resolve and verify that declared sensor metadata is well 45 defined and formatted 46 - A utility to resolve any and all dependencies when using a specified 47 metadata file. 48 49 What the validator is NOT: 50 - Code generator 51 """ 52 53 def __init__( 54 self, 55 include_paths: Sequence[Path] | None = None, 56 log_level: int = logging.WARNING, 57 ) -> None: 58 """ 59 Construct a Validator with some context of the current run. 60 61 Args: 62 include_paths: An optional list of directories in which to resolve 63 dependencies 64 log_level: A desired logging level (defaults to logging.WARNING) 65 """ 66 self._include_paths = include_paths if include_paths else [] 67 self._logger = logging.getLogger(self.__class__.__name__) 68 self._logger.setLevel(log_level) 69 70 def validate(self, metadata: dict) -> dict: 71 """ 72 Accept a structured metadata description. This dictionary should first 73 pass the schema provided in metadata_schema.yaml. Then, every channel 74 used by the sensor should be defined in exactly one of the dependencies. 75 Example YAML: 76 77 deps: 78 - "pw_sensor/channels.yaml" 79 compatible: 80 org: "Bosch" 81 part: "BMA4xx 82 supported-buses: 83 - i2c 84 channels: 85 acceleration: [] 86 die_temperature: [] 87 88 Args: 89 metadata: Structured sensor data, this will NOT be modified 90 91 Returns: 92 A set of attributes, channels, triggers, and units along with a single 93 sensor which match the schema in resolved_schema.json. 94 95 Raises: 96 RuntimeError: An error in the schema validation or a missing 97 definition. 98 FileNotFoundError: One of the dependencies was not found. 99 """ 100 result: dict = { 101 "attributes": {}, 102 "channels": {}, 103 "triggers": {}, 104 "units": {}, 105 "sensors": {}, 106 } 107 metadata = metadata.copy() 108 109 # Validate the incoming schema 110 try: 111 jsonschema.validate(instance=metadata, schema=_METADATA_SCHEMA) 112 except jsonschema.exceptions.ValidationError as e: 113 raise RuntimeError( 114 "ERROR: Malformed sensor metadata YAML:\n" 115 f"{yaml.safe_dump(metadata, indent=2)}" 116 ) from e 117 118 # Resolve all the dependencies, after this, 'result' will have all the 119 # missing properties for which defaults can be provided 120 self._resolve_dependencies(metadata=metadata, out=result) 121 122 self._logger.debug( 123 "Resolved dependencies:\n%s", yaml.safe_dump(result, indent=2) 124 ) 125 126 # Resolve all channel entries (must be done before attributes) 127 self._resolve_channels(metadata=metadata, out=result) 128 129 # Resolve all trigger entries (must be done before attributes) 130 self._resolve_triggers(metadata=metadata, out=result) 131 132 # Resolve all attribute entries 133 self._resolve_attributes(metadata=metadata, out=result) 134 135 compatible, compatible_str = Validator._get_compatible_string_and_dict( 136 metadata.pop("compatible") 137 ) 138 supported_buses = metadata.pop("supported-buses") 139 channels = metadata.pop("channels") 140 attributes = metadata.pop("attributes") 141 triggers = metadata.pop("triggers") 142 143 result["sensors"][compatible_str] = { 144 "compatible": compatible, 145 "supported-buses": self._normalize_supported_buses(supported_buses), 146 "channels": channels, 147 "attributes": attributes, 148 "triggers": triggers, 149 "description": metadata.get("description", ""), 150 "extras": metadata.get("extras", {}), 151 } 152 153 # Validate the final output before returning 154 try: 155 jsonschema.validate(instance=result, schema=_RESOLVED_SCHEMA) 156 except jsonschema.exceptions.ValidationError as e: 157 msg = ( 158 "ERROR: Malformed output YAML: " 159 f"{yaml.safe_dump(result, indent=2)}" 160 ) 161 raise RuntimeError(msg) from e 162 163 return result 164 165 @staticmethod 166 def _normalize_supported_buses(buses: list[str]) -> list[str]: 167 """Resolve a list of supported buses 168 169 Each bus string will be converted to lowercase and all sequential 170 whitespace & '-' characters will be replaced by a single '_'. 171 172 Args: 173 buses: A list of the supported sensor buses 174 175 Returns: 176 Normalized list of buses 177 178 """ 179 filtered_list = list( 180 {re.sub(r"[\s\-]+", "_", s.lower()) for s in buses} 181 ) 182 if len(buses) != len(filtered_list): 183 error = ( 184 "ERROR: bus list contains duplicates when converted to " 185 f"lowercase and concatenated with '_': {sorted(buses)} -> " 186 f"{sorted(filtered_list)}" 187 ) 188 raise RuntimeError(error) 189 return filtered_list 190 191 @staticmethod 192 def _get_compatible_string_and_dict( 193 compatible: dict[str, str], 194 ) -> tuple[dict[str, str], str]: 195 """ 196 Normalize compatible info 197 198 This function processes a 'compatible' dictionary with a 'part' key and 199 an optional 'org' key. It returns a new dictionary with the 'org' key 200 removed if it was empty or missing, and a formatted string based on the 201 'org' key's presence and value. 202 203 Args: 204 compatible (dict[str, str]): A dictionary with a 'part' key and an 205 optional 'org' key. 206 207 Returns: 208 Tuple[dict[str, str], str]: A tuple containing: 209 - A new dictionary with the 'org' key removed if it was empty or 210 missing. 211 - A formatted string: 212 - "{org},{part}" if 'org' exists and is not empty (after trimming) 213 - "part" otherwise. 214 215 """ 216 part = compatible["part"].lower() 217 org = compatible.get("org", "").strip().lower() 218 219 new_compatible = {"part": part} 220 if org: 221 new_compatible["org"] = org 222 return new_compatible, f"{org},{part}" 223 return new_compatible, part 224 225 def _resolve_dependencies(self, metadata: dict, out: dict) -> None: 226 """ 227 Given a list of dependencies, ensure that each of them exists and 228 matches the schema provided in dependency_schema.yaml. Once loaded, the 229 content of the definition file will be resolved (filling in any missing 230 fields that can be inherited) and the final result will be placed in the 231 'out' dictionary. 232 233 Args: 234 metadata: The full sensor metadata passed to the validate function 235 out: Output dictionary where the resolved dependencies should be 236 stored 237 238 Raises: 239 RuntimeError: An error in the schema validation or a missing 240 definition. 241 FileNotFoundError: One of the dependencies was not found. 242 """ 243 deps: None | list[str] = metadata.get("deps") 244 if not deps: 245 self._logger.debug("No dependencies found, skipping imports") 246 return 247 248 merged_deps: dict = { 249 "attributes": {}, 250 "channels": {}, 251 "triggers": {}, 252 "units": {}, 253 } 254 for dep in deps: 255 # Load each of the dependencies, then merge them. This avoids any 256 # include dependency order issues. 257 dep_file = self._get_dependency_file(dep) 258 with open(dep_file, mode="r", encoding="utf-8") as dep_yaml_file: 259 dep_yaml = yaml.safe_load(dep_yaml_file) 260 try: 261 jsonschema.validate( 262 instance=dep_yaml, schema=_DEPENDENCY_SCHEMA 263 ) 264 except jsonschema.exceptions.ValidationError as e: 265 raise RuntimeError( 266 "ERROR: Malformed dependency YAML: " 267 f"{yaml.safe_dump(dep_yaml, indent=2)}" 268 ) from e 269 # Merge all the loaded values into 'merged_deps' 270 for category in merged_deps: 271 self._merge_deps( 272 category=category, 273 dep_yaml=dep_yaml, 274 merged_deps=merged_deps, 275 ) 276 # Backfill any default values from the merged dependencies and put them 277 # into 'out' 278 self._backfill_declarations(declarations=merged_deps, out=out) 279 280 @staticmethod 281 def _merge_deps(category: str, dep_yaml: dict, merged_deps: dict) -> None: 282 """ 283 Pull all properties from dep_yaml[category] and put them into 284 merged_deps after validating that no key duplicates exist. 285 286 Args: 287 category: The index of dep_yaml and merged_deps to merge 288 dep_yaml: The newly loaded dependency YAML 289 merged_deps: The accumulated dependency map 290 """ 291 for key, value in dep_yaml.get(category, {}).items(): 292 assert ( 293 key not in merged_deps[category] 294 ), f"'{key}' was already found under '{category}'" 295 merged_deps[category][key] = value 296 297 def _backfill_declarations(self, declarations: dict, out: dict) -> None: 298 """ 299 Add any missing properties of a declaration object. 300 301 Args: 302 declarations: The top level declarations dictionary loaded from the 303 dependency file. 304 out: The already resolved map of all defined dependencies 305 """ 306 self._backfill_units(declarations=declarations, out=out) 307 self._backfill_channels(declarations=declarations, out=out) 308 self._backfill_attributes(declarations=declarations, out=out) 309 self._backfill_triggers(declarations=declarations, out=out) 310 311 @staticmethod 312 def _backfill_units(declarations: dict, out: dict) -> None: 313 """ 314 Move units from 'declarations' to 'out' while also filling in any 315 default values. 316 317 Args: 318 declarations: The original YAML declaring units. 319 out: Output dictionary where we'll add the key "units" wit the result. 320 """ 321 if out.get("units") is None: 322 out["units"] = {} 323 resolved_units: dict = out["units"] 324 if not declarations.get("units"): 325 return 326 327 for units_id, unit in declarations["units"].items(): 328 # Copy unit to resolved_units and fill any default values 329 assert resolved_units.get(units_id) is None 330 resolved_units[units_id] = unit 331 if not unit.get("name"): 332 unit["name"] = unit["symbol"] 333 if unit.get("description") is None: 334 unit["description"] = "" 335 336 @staticmethod 337 def _backfill_attributes(declarations: dict, out: dict) -> None: 338 """ 339 Move attributes from 'delcarations' to 'out' while also filling in any 340 default values. 341 342 Args: 343 declarations: The original YAML declaring attributes. 344 out: Output dictionary where we'll add the key "attributes" with the 345 result. 346 """ 347 if out.get("attributes") is None: 348 out["attributes"] = {} 349 resolved_attributes: dict = out["attributes"] 350 if not declarations.get("attributes"): 351 return 352 353 for attr_id, attribute in declarations["attributes"].items(): 354 # Copy attribute to resolved_attributes and fill any default values 355 assert resolved_attributes.get(attr_id) is None 356 resolved_attributes[attr_id] = attribute 357 if not attribute.get("name"): 358 attribute["name"] = attr_id 359 if not attribute.get("description"): 360 attribute["description"] = "" 361 362 @staticmethod 363 def _backfill_channels(declarations: dict, out: dict) -> None: 364 """ 365 Move channels from 'declarations' to 'out' while also filling in any 366 default values. 367 368 Args: 369 declarations: The original YAML declaring channels. 370 out: Output dictionary where we'll add the key "channels" with the 371 result. 372 """ 373 if out.get("channels") is None: 374 out["channels"] = {} 375 resolved_channels: dict = out["channels"] 376 if not declarations.get("channels"): 377 return 378 379 for chan_id, channel in declarations["channels"].items(): 380 # Copy channel to resolved_channels and fill any default values 381 assert resolved_channels.get(chan_id) is None 382 resolved_channels[chan_id] = channel 383 if not channel.get("name"): 384 channel["name"] = chan_id 385 if not channel.get("description"): 386 channel["description"] = "" 387 assert channel["units"] in out["units"], ( 388 f"'{channel['units']}' not found in\n" 389 + f"{yaml.safe_dump(out.get('units', {}), indent=2)}" 390 ) 391 392 @staticmethod 393 def _backfill_triggers(declarations: dict, out: dict) -> None: 394 """ 395 Move triggers from 'delcarations' to 'out' while also filling in any 396 default values. 397 398 Args: 399 declarations: The original YAML declaring triggers. 400 out: Output dictionary where we'll add the key "triggers" with the 401 result. 402 """ 403 if out.get("triggers") is None: 404 out["triggers"] = {} 405 resolved_triggers: dict = out["triggers"] 406 if not declarations.get("triggers"): 407 return 408 409 for trigger_id, trigger in declarations["triggers"].items(): 410 # Copy trigger to resolved_triggers and fill any default values 411 assert resolved_triggers.get(trigger_id) is None 412 resolved_triggers[trigger_id] = trigger 413 if not trigger.get("name"): 414 trigger["name"] = trigger_id 415 if not trigger.get("description"): 416 trigger["description"] = "" 417 418 def _resolve_attributes(self, metadata: dict, out: dict) -> None: 419 """Resolve and validate any default values in Attributes 420 421 For each attribute in the metadta, find the matching definition in the 422 'out/attributes' entry and use the data to fill any missing information. 423 For example, if an entry exists that looks like: 424 sample_rate: {} 425 426 We would then try and find the 'sample_rate' key in the out/attributes 427 list (which was already validated by _resolve_dependencies). Since the 428 example above does not override any fields, we would copy the 'name', 429 'description', and 'units' from the definition into the attribute entry. 430 431 Args: 432 metadata: The full sensor metadata passed to the validate function 433 out: The current output, used to get channel definitions 434 435 Raises: 436 RuntimeError: An error in the schema validation or a missing 437 definition. 438 439 """ 440 attributes: list | None = metadata.get("attributes") 441 if not attributes: 442 metadata["attributes"] = [] 443 self._logger.debug("No attributes found, skipping") 444 return 445 446 attribute: dict 447 for attribute in attributes: 448 assert attribute["attribute"] in out["attributes"] 449 assert attribute["units"] in out["units"] 450 451 has_channel_name = "channel" in attribute 452 has_trigger_name = "trigger" in attribute 453 454 if has_channel_name and has_trigger_name: 455 error = ( 456 "Attribute instances cannot specify both channel AND " 457 f"trigger:\n{yaml.safe_dump(attribute, indent=2)}" 458 ) 459 raise RuntimeError(error) 460 if has_channel_name: 461 assert attribute["channel"] in out["channels"] 462 if has_trigger_name: 463 assert attribute["trigger"] in out["triggers"] 464 465 def _resolve_channels(self, metadata: dict, out: dict) -> None: 466 """ 467 For each channel in the metadata, find the matching definition in the 468 'out/channels' entry and use the data to fill any missing information. 469 For example, if an entry exists that looks like: 470 acceleration: {} 471 472 We would then try and find the 'acceleration' key in the out/channels 473 dict (which was already validated by _resolve_dependencies). Since the 474 example above does not override any fields, we would copy the 'name', 475 'description', and 'units' from the definition into the channel entry. 476 477 Args: 478 metadata: The full sensor metadata passed to the validate function 479 out: The current output, used to get channel definitions 480 481 Raises: 482 RuntimeError: An error in the schema validation or a missing 483 definition. 484 """ 485 channels: dict | None = metadata.get("channels") 486 if not channels: 487 self._logger.debug("No channels found, skipping") 488 metadata["channels"] = {} 489 return 490 491 channel_name: str 492 indices: list[dict] 493 for channel_name, indices in channels.items(): 494 # channel_name must have been resolved by now. 495 if out["channels"].get(channel_name) is None: 496 raise RuntimeError( 497 f"Failed to find a definition for '{channel_name}', did you" 498 " forget a dependency?" 499 ) 500 channel = out["channels"][channel_name] 501 # The content of 'channel' came from the 'out/channels' dict which 502 # was already validated and every field added if missing. At this 503 # point it's safe to access the channel's name, description, and 504 # units. 505 506 if not indices: 507 indices.append({}) 508 509 index: dict 510 for index in indices: 511 if not index.get("name"): 512 index["name"] = channel["name"] 513 if not index.get("description"): 514 index["description"] = channel["description"] 515 # Always use the same units 516 index["units"] = channel["units"] 517 518 def _resolve_triggers(self, metadata: dict, out: dict) -> None: 519 """ 520 For each trigger in the metadata, find the matching definition in the 521 'out/triggers' entry and use the data to fill any missing information. 522 For example, if an entry exists that looks like: 523 data_ready: {} 524 525 We would then try and find the 'data_ready' key in the out/triggers 526 dict (which was already validated by _resolve_dependencies). Since the 527 example above does not override any fields, we would copy the 'name' and 528 'description' from the definition into the trigger entry. 529 530 Args: 531 metadata: The full sensor metadata passed to the validate function 532 out: The current output, used to get trigger definitions 533 534 Raises: 535 RuntimeError: An error in the schema validation or a missing 536 definition. 537 """ 538 triggers: list | None = metadata.get("triggers") 539 if not triggers: 540 metadata["triggers"] = [] 541 self._logger.debug("No triggers found, skipping") 542 return 543 544 for trigger_name in triggers: 545 assert trigger_name in out["triggers"] 546 547 def _get_dependency_file(self, dep: str) -> Path: 548 """ 549 Search for a dependency file and return the full path to it if found. 550 551 Args: 552 dep: The dependency string as provided by the metadata yaml. 553 554 Returns: 555 The dependency file as a Path object if found. 556 557 Raises: 558 FileNotFoundError: One of the dependencies was not found. 559 """ 560 error_string = f"Failed to find {dep} using search paths:" 561 # Check if a full path was used 562 if Path(dep).is_file(): 563 return Path(dep) 564 565 # Search all the include paths 566 for path in self._include_paths: 567 if (path / dep).is_file(): 568 return path / dep 569 error_string += f"\n- {path}" 570 571 raise FileNotFoundError(error_string) 572