1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Generate and serialize update bundles.""" 15 16import argparse 17import logging 18import os 19from pathlib import Path 20import shutil 21from typing import Dict, Iterable, Optional, Tuple 22 23from pw_software_update import metadata 24from pw_software_update.tuf_pb2 import SignedRootMetadata, SignedTargetsMetadata 25from pw_software_update.update_bundle_pb2 import UpdateBundle 26 27_LOG = logging.getLogger(__package__) 28 29 30def targets_from_directory( 31 root_dir: Path, 32 exclude: Iterable[Path] = tuple(), 33 remap_paths: Optional[Dict[Path, str]] = None) -> Dict[str, Path]: 34 """Given a directory on dist, generate a dict of target names to files. 35 36 Args: 37 root_dir: Directory to crawl for targets. 38 exclude: Paths relative to root_dir to exclude as targets. 39 remap_paths: Custom target names to use for targets. 40 41 Each file in the input directory will be read in as a target file, unless 42 its path (relative to the TUF repo root) is among the excludes. 43 44 Default behavior is to treat root_dir-relative paths as the strings to use 45 as targets file names, but remapping can be used to change a target file 46 name to any string. If some remappings are provided but a file is found that 47 does not have a remapping, a warning will be logged. If a remap is declared 48 for a file that does not exist, FileNotFoundError will be raised. 49 """ 50 if not root_dir.is_dir(): 51 raise ValueError( 52 f'Cannot generate TUF targets from {root_dir}; not a directory.') 53 targets = {} 54 for path in root_dir.glob('**/*'): 55 if path.is_dir(): 56 continue 57 rel_path = path.relative_to(root_dir) 58 if rel_path in exclude: 59 continue 60 target_name = str(rel_path.as_posix()) 61 if remap_paths: 62 if rel_path in remap_paths: 63 target_name = remap_paths[rel_path] 64 else: 65 _LOG.warning('Some remaps defined, but not "%s"', target_name) 66 targets[target_name] = path 67 68 if remap_paths is not None: 69 for original_path, new_target_file_name in remap_paths.items(): 70 if new_target_file_name not in targets: 71 raise FileNotFoundError( 72 f'Unable to remap "{original_path}" to' 73 f' "{new_target_file_name}"; file not found in root dir.') 74 75 return targets 76 77 78def gen_unsigned_update_bundle( 79 targets: Dict[Path, str], 80 persist: Optional[Path] = None, 81 targets_metadata_version: int = metadata.DEFAULT_METADATA_VERSION, 82 root_metadata: SignedRootMetadata = None) -> UpdateBundle: 83 """Given a set of targets, generates an unsigned UpdateBundle. 84 85 Args: 86 targets: A dict mapping payload Paths to their target names. 87 persist: If not None, persist the raw TUF repository to this directory. 88 targets_metadata_version: version number for the targets metadata. 89 root_metadata: Optional signed Root metadata. 90 91 The input targets will be treated as an ephemeral TUF repository for the 92 purposes of building an UpdateBundle instance. This approach differs 93 slightly from the normal concept of a TUF repository, which is typically a 94 directory on disk. For ease in debugging raw repository contents, the 95 `persist` argument can be supplied. If a persist Path is supplied, the TUF 96 repository will be persisted to disk at that location. 97 98 NOTE: If path separator characters (like '/') are used in target names, then 99 persisting the repository to disk via the 'persist' argument will create the 100 corresponding directory structure. 101 102 NOTE: If a root metadata is included, the client is expected to first 103 upgrade its on-device trusted root metadata before verifying the rest of 104 the bundle. 105 """ 106 if persist: 107 if persist.exists() and not persist.is_dir(): 108 raise ValueError(f'TUF repo cannot be persisted to "{persist}";' 109 ' file exists and is not a directory.') 110 if persist.exists(): 111 shutil.rmtree(persist) 112 113 os.makedirs(persist) 114 115 target_payloads = {} 116 for path, target_name in targets.items(): 117 target_payloads[target_name] = path.read_bytes() 118 if persist: 119 target_persist_path = persist / target_name 120 os.makedirs(target_persist_path.parent, exist_ok=True) 121 shutil.copy(path, target_persist_path) 122 123 targets_metadata = metadata.gen_targets_metadata( 124 target_payloads, version=targets_metadata_version) 125 unsigned_targets_metadata = SignedTargetsMetadata( 126 serialized_targets_metadata=targets_metadata.SerializeToString()) 127 128 return UpdateBundle( 129 root_metadata=root_metadata, 130 targets_metadata=dict(targets=unsigned_targets_metadata), 131 target_payloads=target_payloads) 132 133 134def parse_target_arg(target_arg: str) -> Tuple[Path, str]: 135 """Parse an individual target string passed in to the --targets argument. 136 137 Target strings take the following form: 138 "FILE_PATH > TARGET_NAME" 139 140 For example: 141 "fw_images/main_image.bin > main" 142 """ 143 try: 144 file_path_str, target_name = target_arg.split('>') 145 return Path(file_path_str.strip()), target_name.strip() 146 except ValueError as err: 147 raise ValueError('Targets must be strings of the form:\n' 148 ' "FILE_PATH > TARGET_NAME"') from err 149 150 151def parse_args() -> argparse.Namespace: 152 """Parse CLI arguments.""" 153 parser = argparse.ArgumentParser(description=__doc__) 154 parser.add_argument('-t', 155 '--targets', 156 type=str, 157 nargs='+', 158 required=True, 159 help='Strings defining targets to bundle') 160 parser.add_argument('-o', 161 '--out', 162 type=Path, 163 required=True, 164 help='Output path for serialized UpdateBundle') 165 parser.add_argument('--persist', 166 type=Path, 167 default=None, 168 help=('If provided, TUF repo will be persisted to disk' 169 ' at this path for debugging')) 170 parser.add_argument('--targets-metadata-version', 171 type=int, 172 default=metadata.DEFAULT_METADATA_VERSION, 173 help='Version number for the targets metadata') 174 parser.add_argument('--targets-metadata-version-file', 175 type=Path, 176 default=None, 177 help='Read version number string from this file. When ' 178 'provided, content of this file supersede ' 179 '--targets-metadata-version') 180 parser.add_argument('--signed-root-metadata', 181 type=Path, 182 default=None, 183 help='Path to the signed Root metadata') 184 return parser.parse_args() 185 186 187def main(targets: Iterable[str], 188 out: Path, 189 persist: Path = None, 190 targets_metadata_version: int = metadata.DEFAULT_METADATA_VERSION, 191 targets_metadata_version_file: Path = None, 192 signed_root_metadata: Path = None) -> None: 193 """Generates an UpdateBundle and serializes it to disk.""" 194 target_dict = {} 195 for target_arg in targets: 196 path, target_name = parse_target_arg(target_arg) 197 target_dict[path] = target_name 198 199 root_metadata = None 200 if signed_root_metadata: 201 root_metadata = SignedRootMetadata.FromString( 202 signed_root_metadata.read_bytes()) 203 204 if targets_metadata_version_file: 205 with targets_metadata_version_file.open() as version_file: 206 targets_metadata_version = int(version_file.read().strip()) 207 208 bundle = gen_unsigned_update_bundle(target_dict, persist, 209 targets_metadata_version, 210 root_metadata) 211 212 out.write_bytes(bundle.SerializeToString()) 213 214 215if __name__ == '__main__': 216 logging.basicConfig() 217 main(**vars(parse_args())) 218