• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""CLI to describe a yaml sensor definition."""
15
16import argparse
17from collections.abc import Sequence
18from dataclasses import dataclass
19import importlib.resources
20import logging
21from pathlib import Path
22import subprocess
23import sys
24
25import jsonschema  # type: ignore
26import jsonschema.exceptions  # type: ignore
27from pw_sensor.validator import Validator
28import yaml
29
30logging.basicConfig(level=logging.DEBUG)
31_LOG = logging.getLogger("sensor-describe")
32
33_OUTPUT_SCHEMA = yaml.safe_load(
34    importlib.resources.read_text("pw_sensor", "resolved_schema.json")
35)
36
37
38@dataclass
39class Args:
40    """Strongly typed wrapper around the arguments provided"""
41
42    include_paths: Sequence[Path]
43    descriptor_paths: Sequence[Path]
44    generator_command: str | None
45    output_file: Path | None
46    log_level: int = logging.WARNING
47
48
49def get_args() -> Args:
50    """
51    Setup the argument parser, parse the args, and return a dataclass with the
52    arguments.
53    """
54    parser = argparse.ArgumentParser()
55    parser.add_argument(
56        "--include-path",
57        "-I",
58        action="append",
59        type=lambda p: Path(p).resolve(),
60        required=True,
61        help="Directories in which to search for dependency files",
62    )
63    parser.add_argument(
64        "--verbose",
65        "-v",
66        action="count",
67        default=0,
68        help="Increase verbosity level (can be used multiple times)",
69    )
70    parser.add_argument(
71        "--generator",
72        "-g",
73        type=str,
74        help="Generator command to run along with any flags. Data will be "
75        "passed into the generator as YAML through stdin",
76    )
77    parser.add_argument(
78        "-o",
79        dest="output",
80        type=Path,
81        help="Write output to file instead of standard out",
82    )
83    parser.add_argument(
84        "descriptors",
85        nargs="*",
86        type=lambda p: Path(p).resolve(),
87        help="One or more files to validate",
88    )
89
90    args = parser.parse_args()
91    if args.verbose == 0:
92        log_level = logging.WARNING
93    elif args.verbose == 1:
94        log_level = logging.INFO
95    else:
96        log_level = logging.DEBUG
97    return Args(
98        include_paths=args.include_path,
99        descriptor_paths=args.descriptors,
100        generator_command=args.generator,
101        output_file=args.output,
102        log_level=log_level,
103    )
104
105
106def main() -> None:
107    """
108    Main entry point to the CLI. After parsing the arguments for the below
109    parameters, the utility will validate the descriptor files and pass the
110    output to the generator:
111    - include paths
112    - verbosity
113    - generator
114    - descriptor files
115    """
116    args = get_args()
117    _LOG.setLevel(level=args.log_level)
118
119    validator = Validator(
120        include_paths=args.include_paths, log_level=args.log_level
121    )
122    superset: dict = {
123        "attributes": {},
124        "channels": {},
125        "triggers": {},
126        "sensors": {},
127    }
128    for descriptor_file in args.descriptor_paths:
129        _LOG.info("Loading '%s'", descriptor_file)
130        if not descriptor_file.is_file():
131            raise RuntimeError(f"'{descriptor_file}' is not a file")
132        with open(descriptor_file, mode="r", encoding="utf-8") as stream:
133            content = yaml.safe_load(stream=stream)
134            _LOG.debug("Validating:\n%s", yaml.safe_dump(content, indent=2))
135            content = validator.validate(content)
136            _LOG.debug("Result:\n%s", yaml.safe_dump(content, indent=2))
137        # Add sensor
138        for sensor_id, values in content["sensors"].items():
139            assert superset["sensors"].get(sensor_id) is None
140            superset["sensors"][sensor_id] = values
141        # Add channels
142        for chan_id, chan_spec in content["channels"].items():
143            assert superset["channels"].get(chan_id) is None
144            superset["channels"][chan_id] = chan_spec
145        # Add attributes
146        for attr_id, attr_spec in content["attributes"].items():
147            assert superset["attributes"].get(attr_id) is None
148            superset["attributes"][attr_id] = attr_spec
149        # Add triggers
150        for trig_id, trig_spec in content["triggers"].items():
151            assert superset["triggers"].get(trig_id) is None
152            superset["triggers"][trig_id] = trig_spec
153
154    _LOG.debug("Final descriptor:\n%s", yaml.safe_dump(superset, indent=2))
155    _LOG.info("Validating...")
156    try:
157        jsonschema.validate(instance=superset, schema=_OUTPUT_SCHEMA)
158    except jsonschema.exceptions.ValidationError as e:
159        raise RuntimeError(
160            "ERROR: Malformed merged output:\n"
161            f"{yaml.safe_dump(superset, indent=2)}"
162        ) from e
163    content_string = yaml.safe_dump(superset)
164
165    if args.generator_command:
166        cmd = args.generator_command.split(sep=" ")
167        _LOG.info("Running generator %s", cmd)
168
169        with subprocess.Popen(
170            cmd,
171            stdin=subprocess.PIPE,
172            stdout=subprocess.PIPE,
173            stderr=subprocess.PIPE,
174        ) as process:
175            assert process.stdin is not None
176            process.stdin.write(content_string.encode("utf-8"))
177            out, err = process.communicate()
178
179        if out:
180            if args.output_file:
181                with open(args.output_file, mode="w", encoding="utf-8") as o:
182                    o.write(out.decode("utf-8"))
183            else:
184                print(out.decode("utf-8"))
185        if err:
186            _LOG.error(err.decode("utf-8"))
187        if process.returncode != 0:
188            sys.exit(-1)
189
190
191if __name__ == '__main__':
192    main()
193