• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Generate and serialize update bundles."""
15
16import argparse
17import logging
18import os
19from pathlib import Path
20import shutil
21from typing import Dict, Iterable, Optional, Tuple
22
23from pw_software_update import metadata
24from pw_software_update.tuf_pb2 import SignedRootMetadata, SignedTargetsMetadata
25from pw_software_update.update_bundle_pb2 import UpdateBundle
26
27_LOG = logging.getLogger(__package__)
28
29
30def targets_from_directory(
31        root_dir: Path,
32        exclude: Iterable[Path] = tuple(),
33        remap_paths: Optional[Dict[Path, str]] = None) -> Dict[str, Path]:
34    """Given a directory on dist, generate a dict of target names to files.
35
36    Args:
37      root_dir: Directory to crawl for targets.
38      exclude: Paths relative to root_dir to exclude as targets.
39      remap_paths: Custom target names to use for targets.
40
41    Each file in the input directory will be read in as a target file, unless
42    its path (relative to the TUF repo root) is among the excludes.
43
44    Default behavior is to treat root_dir-relative paths as the strings to use
45    as targets file names, but remapping can be used to change a target file
46    name to any string. If some remappings are provided but a file is found that
47    does not have a remapping, a warning will be logged. If a remap is declared
48    for a file that does not exist, FileNotFoundError will be raised.
49    """
50    if not root_dir.is_dir():
51        raise ValueError(
52            f'Cannot generate TUF targets from {root_dir}; not a directory.')
53    targets = {}
54    for path in root_dir.glob('**/*'):
55        if path.is_dir():
56            continue
57        rel_path = path.relative_to(root_dir)
58        if rel_path in exclude:
59            continue
60        target_name = str(rel_path.as_posix())
61        if remap_paths:
62            if rel_path in remap_paths:
63                target_name = remap_paths[rel_path]
64            else:
65                _LOG.warning('Some remaps defined, but not "%s"', target_name)
66        targets[target_name] = path
67
68    if remap_paths is not None:
69        for original_path, new_target_file_name in remap_paths.items():
70            if new_target_file_name not in targets:
71                raise FileNotFoundError(
72                    f'Unable to remap "{original_path}" to'
73                    f' "{new_target_file_name}"; file not found in root dir.')
74
75    return targets
76
77
78def gen_unsigned_update_bundle(
79        targets: Dict[Path, str],
80        persist: Optional[Path] = None,
81        targets_metadata_version: int = metadata.DEFAULT_METADATA_VERSION,
82        root_metadata: SignedRootMetadata = None) -> UpdateBundle:
83    """Given a set of targets, generates an unsigned UpdateBundle.
84
85    Args:
86      targets: A dict mapping payload Paths to their target names.
87      persist: If not None, persist the raw TUF repository to this directory.
88      targets_metadata_version: version number for the targets metadata.
89      root_metadata: Optional signed Root metadata.
90
91    The input targets will be treated as an ephemeral TUF repository for the
92    purposes of building an UpdateBundle instance. This approach differs
93    slightly from the normal concept of a TUF repository, which is typically a
94    directory on disk. For ease in debugging raw repository contents, the
95    `persist` argument can be supplied. If a persist Path is supplied, the TUF
96    repository will be persisted to disk at that location.
97
98    NOTE: If path separator characters (like '/') are used in target names, then
99    persisting the repository to disk via the 'persist' argument will create the
100    corresponding directory structure.
101
102    NOTE: If a root metadata is included, the client is expected to first
103    upgrade its on-device trusted root metadata before verifying the rest of
104    the bundle.
105    """
106    if persist:
107        if persist.exists() and not persist.is_dir():
108            raise ValueError(f'TUF repo cannot be persisted to "{persist}";'
109                             ' file exists and is not a directory.')
110        if persist.exists():
111            shutil.rmtree(persist)
112
113        os.makedirs(persist)
114
115    target_payloads = {}
116    for path, target_name in targets.items():
117        target_payloads[target_name] = path.read_bytes()
118        if persist:
119            target_persist_path = persist / target_name
120            os.makedirs(target_persist_path.parent, exist_ok=True)
121            shutil.copy(path, target_persist_path)
122
123    targets_metadata = metadata.gen_targets_metadata(
124        target_payloads, version=targets_metadata_version)
125    unsigned_targets_metadata = SignedTargetsMetadata(
126        serialized_targets_metadata=targets_metadata.SerializeToString())
127
128    return UpdateBundle(
129        root_metadata=root_metadata,
130        targets_metadata=dict(targets=unsigned_targets_metadata),
131        target_payloads=target_payloads)
132
133
134def parse_target_arg(target_arg: str) -> Tuple[Path, str]:
135    """Parse an individual target string passed in to the --targets argument.
136
137    Target strings take the following form:
138      "FILE_PATH > TARGET_NAME"
139
140    For example:
141      "fw_images/main_image.bin > main"
142    """
143    try:
144        file_path_str, target_name = target_arg.split('>')
145        return Path(file_path_str.strip()), target_name.strip()
146    except ValueError as err:
147        raise ValueError('Targets must be strings of the form:\n'
148                         '  "FILE_PATH > TARGET_NAME"') from err
149
150
151def parse_args() -> argparse.Namespace:
152    """Parse CLI arguments."""
153    parser = argparse.ArgumentParser(description=__doc__)
154    parser.add_argument('-t',
155                        '--targets',
156                        type=str,
157                        nargs='+',
158                        required=True,
159                        help='Strings defining targets to bundle')
160    parser.add_argument('-o',
161                        '--out',
162                        type=Path,
163                        required=True,
164                        help='Output path for serialized UpdateBundle')
165    parser.add_argument('--persist',
166                        type=Path,
167                        default=None,
168                        help=('If provided, TUF repo will be persisted to disk'
169                              ' at this path for debugging'))
170    parser.add_argument('--targets-metadata-version',
171                        type=int,
172                        default=metadata.DEFAULT_METADATA_VERSION,
173                        help='Version number for the targets metadata')
174    parser.add_argument('--targets-metadata-version-file',
175                        type=Path,
176                        default=None,
177                        help='Read version number string from this file. When '
178                        'provided, content of this file supersede '
179                        '--targets-metadata-version')
180    parser.add_argument('--signed-root-metadata',
181                        type=Path,
182                        default=None,
183                        help='Path to the signed Root metadata')
184    return parser.parse_args()
185
186
187def main(targets: Iterable[str],
188         out: Path,
189         persist: Path = None,
190         targets_metadata_version: int = metadata.DEFAULT_METADATA_VERSION,
191         targets_metadata_version_file: Path = None,
192         signed_root_metadata: Path = None) -> None:
193    """Generates an UpdateBundle and serializes it to disk."""
194    target_dict = {}
195    for target_arg in targets:
196        path, target_name = parse_target_arg(target_arg)
197        target_dict[path] = target_name
198
199    root_metadata = None
200    if signed_root_metadata:
201        root_metadata = SignedRootMetadata.FromString(
202            signed_root_metadata.read_bytes())
203
204    if targets_metadata_version_file:
205        with targets_metadata_version_file.open() as version_file:
206            targets_metadata_version = int(version_file.read().strip())
207
208    bundle = gen_unsigned_update_bundle(target_dict, persist,
209                                        targets_metadata_version,
210                                        root_metadata)
211
212    out.write_bytes(bundle.SerializeToString())
213
214
215if __name__ == '__main__':
216    logging.basicConfig()
217    main(**vars(parse_args()))
218