• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""
15Software update related operations.
16
17Learn more at: pigweed.dev/pw_software_update
18
19"""
20
21import argparse
22import os
23import sys
24from pathlib import Path
25
26from pw_software_update import (
27    dev_sign,
28    keys,
29    metadata,
30    root_metadata,
31    update_bundle,
32)
33from pw_software_update.tuf_pb2 import (
34    RootMetadata,
35    SignedRootMetadata,
36    TargetsMetadata,
37)
38from pw_software_update.update_bundle_pb2 import UpdateBundle
39
40
41def inspect_bundle_handler(arg) -> None:
42    """Prints bundle contents."""
43
44    try:
45        bundle = UpdateBundle.FromString(arg.pathname.read_bytes())
46        signed_targets_metadata = bundle.targets_metadata['targets']
47        targets_metadata = TargetsMetadata().FromString(
48            signed_targets_metadata.serialized_targets_metadata
49        )
50        print('Targets Metadata:')
51        print('=================')
52        print(targets_metadata)
53
54        print('\nTarget Files:')
55        print('=============')
56        for i, (name, contents) in enumerate(bundle.target_payloads.items()):
57            print(f'{i+1} of {len(bundle.target_payloads)}:')
58            print(f'  filename: {name}')
59            print(f'  length: {len(contents)}')
60
61            first_32_bytes = contents[:32]
62            print(f'  ascii contents(first 32 bytes): {first_32_bytes!r}')
63            print(f'  hex contents(first 32 bytes): {first_32_bytes.hex()}\n')
64
65        signed_root_metadata = bundle.root_metadata
66        deserialized_root_metadata = RootMetadata.FromString(
67            signed_root_metadata.serialized_root_metadata
68        )
69        print('\nRoot Metadata:')
70        print('==============')
71        print(deserialized_root_metadata)
72
73    except IOError as error:
74        print(error)
75
76
77def _new_inspect_bundle_parser(subparsers) -> None:
78    """Parser to handle inspect-bundle subcommand"""
79
80    formatter_class = lambda prog: argparse.HelpFormatter(
81        prog, max_help_position=100, width=200
82    )
83    inspect_bundle_parser = subparsers.add_parser(
84        'inspect-bundle',
85        description='Outputs contents of bundle',
86        formatter_class=formatter_class,
87        help="",
88    )
89
90    inspect_bundle_parser.set_defaults(func=inspect_bundle_handler)
91    inspect_bundle_parser.add_argument(
92        'pathname', type=Path, help='Path to bundle'
93    )
94
95
96def sign_bundle_handler(arg) -> None:
97    """Handles signing of a bundle"""
98
99    try:
100        signed_bundle = dev_sign.sign_update_bundle(
101            UpdateBundle.FromString(arg.bundle.read_bytes()),
102            arg.key.read_bytes(),
103        )
104        arg.bundle.write_bytes(signed_bundle.SerializeToString())
105
106    except IOError as error:
107        print(error)
108
109
110def _new_sign_bundle_parser(subparsers) -> None:
111    """Parser for sign-bundle subcommand"""
112
113    formatter_class = lambda prog: argparse.HelpFormatter(
114        prog, max_help_position=100, width=200
115    )
116    sign_bundle_parser = subparsers.add_parser(
117        'sign-bundle',
118        description='Sign an existing bundle using a development key',
119        formatter_class=formatter_class,
120        help="",
121    )
122
123    sign_bundle_parser.set_defaults(func=sign_bundle_handler)
124    required_arguments = sign_bundle_parser.add_argument_group(
125        'required arguments'
126    )
127    required_arguments.add_argument(
128        '--bundle',
129        help='Bundle to be signed',
130        metavar='BUNDLE',
131        required=True,
132        type=Path,
133    )
134    required_arguments.add_argument(
135        '--key',
136        help='Bundle signing key',
137        metavar='KEY',
138        required=True,
139        type=Path,
140    )
141
142
143def add_file_to_bundle(
144    bundle: UpdateBundle, file_name: str, file_contents: bytes
145) -> UpdateBundle:
146    """Adds a target file represented by file_name and file_contents to an
147    existing UpdateBundle -- bundle and returns the updated UpdateBundle object.
148    """
149
150    if not file_name in bundle.target_payloads:
151        bundle.target_payloads[file_name] = file_contents
152    else:
153        raise Exception(f'File name {file_name} already exists in bundle')
154
155    signed_targets_metadata = bundle.targets_metadata['targets']
156    targets_metadata = TargetsMetadata().FromString(
157        signed_targets_metadata.serialized_targets_metadata
158    )
159
160    matching_file_names = list(
161        filter(
162            lambda name: name.file_name == file_name,
163            targets_metadata.target_files,
164        )
165    )
166
167    target_file = metadata.gen_target_file(file_name, file_contents)
168
169    if not matching_file_names:
170        targets_metadata.target_files.append(target_file)
171    else:
172        raise Exception(f'File name {file_name} already exists in bundle')
173
174    bundle.targets_metadata[
175        'targets'
176    ].serialized_targets_metadata = targets_metadata.SerializeToString()
177
178    return bundle
179
180
181def add_file_to_bundle_handler(arg) -> None:
182    """Add a new file to an existing bundle. Updates the targets metadata
183    and errors out if the file already exists.
184    """
185
186    try:
187        if not arg.new_name:
188            file_name = os.path.splitext(os.path.basename(arg.file))[0]
189        else:
190            file_name = arg.new_name
191
192        bundle = UpdateBundle().FromString(arg.bundle.read_bytes())
193        updated_bundle = add_file_to_bundle(
194            bundle=bundle,
195            file_name=file_name,
196            file_contents=arg.file.read_bytes(),
197        )
198
199        arg.bundle.write_bytes(updated_bundle.SerializeToString())
200
201    except IOError as error:
202        print(error)
203
204
205def _new_add_file_to_bundle_parser(subparsers) -> None:
206    """Parser for adding file to bundle subcommand"""
207
208    formatter_class = lambda prog: argparse.HelpFormatter(
209        prog, max_help_position=100, width=200
210    )
211    add_file_to_bundle_parser = subparsers.add_parser(
212        'add-file-to-bundle',
213        description='Add a file to an existing bundle',
214        formatter_class=formatter_class,
215        help="",
216    )
217    add_file_to_bundle_parser.set_defaults(func=add_file_to_bundle_handler)
218    required_arguments = add_file_to_bundle_parser.add_argument_group(
219        'required arguments'
220    )
221    required_arguments.add_argument(
222        '--bundle',
223        help='Path to an existing bundle',
224        metavar='BUNDLE',
225        required=True,
226        type=Path,
227    )
228    required_arguments.add_argument(
229        '--file',
230        help='Path to a target file',
231        metavar='FILE_PATH',
232        required=True,
233        type=Path,
234    )
235    required_arguments.add_argument(
236        '--new-name',
237        help='Optional new name for target',
238        metavar='NEW_NAME',
239        required=False,
240        type=str,
241    )
242
243
244def add_root_metadata_to_bundle_handler(arg) -> None:
245    """Handles appending of root metadata to a bundle"""
246
247    try:
248        bundle = UpdateBundle().FromString(arg.bundle.read_bytes())
249        bundle.root_metadata.CopyFrom(
250            SignedRootMetadata().FromString(
251                arg.append_root_metadata.read_bytes()
252            )
253        )
254        arg.bundle.write_bytes(bundle.SerializeToString())
255
256    except IOError as error:
257        print(error)
258
259
260def _new_add_root_metadata_to_bundle_parser(subparsers) -> None:
261    """Parser for subcommand adding root metadata to bundle"""
262
263    formatter_class = lambda prog: argparse.HelpFormatter(
264        prog, max_help_position=100, width=200
265    )
266    add_root_metadata_to_bundle_parser = subparsers.add_parser(
267        'add-root-metadata-to-bundle',
268        description='Add root metadata to a bundle',
269        formatter_class=formatter_class,
270        help="",
271    )
272    add_root_metadata_to_bundle_parser.set_defaults(
273        func=add_root_metadata_to_bundle_handler
274    )
275    required_arguments = add_root_metadata_to_bundle_parser.add_argument_group(
276        'required arguments'
277    )
278    required_arguments.add_argument(
279        '--append-root-metadata',
280        help='Path to root metadata',
281        metavar='ROOT_METADATA',
282        required=True,
283        type=Path,
284    )
285    required_arguments.add_argument(
286        '--bundle',
287        help='Path to bundle',
288        metavar='BUNDLE',
289        required=True,
290        type=Path,
291    )
292
293
294def create_empty_bundle_handler(arg) -> None:
295    """Handles the creation of an empty bundle and writes it to disc."""
296
297    try:
298        bundle = update_bundle.gen_empty_update_bundle(
299            arg.target_metadata_version
300        )
301        arg.pathname.write_bytes(bundle.SerializeToString())
302
303    except IOError as error:
304        print(error)
305
306
307def _new_create_empty_bundle_parser(subparsers) -> None:
308    """Parser for creation of an empty bundle."""
309
310    formatter_class = lambda prog: argparse.HelpFormatter(
311        prog, max_help_position=100, width=200
312    )
313    create_empty_bundle_parser = subparsers.add_parser(
314        'create-empty-bundle',
315        description='Creation of an empty bundle',
316        formatter_class=formatter_class,
317        help="",
318    )
319    create_empty_bundle_parser.set_defaults(func=create_empty_bundle_handler)
320    create_empty_bundle_parser.add_argument(
321        'pathname', type=Path, help='Path to newly created empty bundle'
322    )
323    create_empty_bundle_parser.add_argument(
324        '--target-metadata-version',
325        help='Version number for targets metadata; Defaults to 1',
326        metavar='VERSION',
327        type=int,
328        default=1,
329        required=False,
330    )
331
332
333def inspect_root_metadata_handler(arg) -> None:
334    """Prints root metadata contents as defined by "RootMetadata" message
335    structure in tuf.proto as well as the number of identified signatures.
336    """
337
338    try:
339        signed_root_metadata = SignedRootMetadata.FromString(
340            arg.pathname.read_bytes()
341        )
342
343        deserialized_root_metadata = RootMetadata.FromString(
344            signed_root_metadata.serialized_root_metadata
345        )
346        print(deserialized_root_metadata)
347
348        print(
349            'Number of signatures found:', len(signed_root_metadata.signatures)
350        )
351
352    except IOError as error:
353        print(error)
354
355
356def _new_inspect_root_metadata_parser(subparsers) -> None:
357    """Parser to handle inspect-root-metadata subcommand"""
358
359    formatter_class = lambda prog: argparse.HelpFormatter(
360        prog, max_help_position=100, width=200
361    )
362    inspect_root_metadata_parser = subparsers.add_parser(
363        'inspect-root-metadata',
364        description='Outputs contents of root metadata',
365        formatter_class=formatter_class,
366        help="",
367    )
368
369    inspect_root_metadata_parser.set_defaults(
370        func=inspect_root_metadata_handler
371    )
372    inspect_root_metadata_parser.add_argument(
373        'pathname', type=Path, help='Path to root metadata'
374    )
375
376
377def sign_root_metadata_handler(arg) -> None:
378    """Handler for signing of root metadata"""
379
380    try:
381        signed_root_metadata = dev_sign.sign_root_metadata(
382            SignedRootMetadata.FromString(arg.root_metadata.read_bytes()),
383            arg.root_key.read_bytes(),
384        )
385        arg.root_metadata.write_bytes(signed_root_metadata.SerializeToString())
386
387    except IOError as error:
388        print(error)
389
390
391def _new_sign_root_metadata_parser(subparsers) -> None:
392    """Parser to handle sign-root-metadata subcommand"""
393
394    formatter_class = lambda prog: argparse.HelpFormatter(
395        prog, max_help_position=100, width=200
396    )
397    sign_root_metadata_parser = subparsers.add_parser(
398        'sign-root-metadata',
399        description='Signing of root metadata',
400        formatter_class=formatter_class,
401        help="",
402    )
403
404    sign_root_metadata_parser.set_defaults(func=sign_root_metadata_handler)
405    required_arguments = sign_root_metadata_parser.add_argument_group(
406        'required arguments'
407    )
408    required_arguments.add_argument(
409        '--root-metadata',
410        help='Root metadata to be signed',
411        metavar='ROOT_METADATA',
412        required=True,
413        type=Path,
414    )
415    required_arguments.add_argument(
416        '--root-key',
417        help='Root signing key',
418        metavar='ROOT_KEY',
419        required=True,
420        type=Path,
421    )
422
423
424def create_root_metadata_handler(arg) -> None:
425    """Handler function for creation of root metadata."""
426
427    try:
428        root_metadata.main(
429            arg.out, arg.append_root_key, arg.append_targets_key, arg.version
430        )
431
432        # TODO(eashansingh): Print message that allows user
433        # to visualize root metadata with
434        # `pw update inspect-root-metadata` command
435
436    except IOError as error:
437        print(error)
438
439
440def _new_create_root_metadata_parser(subparsers) -> None:
441    """Parser to handle create-root-metadata subcommand."""
442
443    formatter_class = lambda prog: argparse.HelpFormatter(
444        prog, max_help_position=100, width=200
445    )
446    create_root_metadata_parser = subparsers.add_parser(
447        'create-root-metadata',
448        description='Creation of root metadata',
449        formatter_class=formatter_class,
450        help='',
451    )
452    create_root_metadata_parser.set_defaults(func=create_root_metadata_handler)
453    create_root_metadata_parser.add_argument(
454        '--version',
455        help='Canonical version number for rollback checks; Defaults to 1',
456        type=int,
457        default=1,
458        required=False,
459    )
460
461    required_arguments = create_root_metadata_parser.add_argument_group(
462        'required arguments'
463    )
464    required_arguments.add_argument(
465        '--append-root-key',
466        help='Path to root key',
467        metavar='ROOT_KEY',
468        required=True,
469        action='append',
470        type=Path,
471    )
472    required_arguments.add_argument(
473        '--append-targets-key',
474        help='Path to targets key',
475        metavar='TARGETS_KEY',
476        required=True,
477        action='append',
478        type=Path,
479    )
480    required_arguments.add_argument(
481        '-o', '--out', help='Path to output file', required=True, type=Path
482    )
483
484
485def generate_key_handler(arg) -> None:
486    """Handler function for key generation"""
487
488    try:
489        keys.gen_ecdsa_keypair(arg.pathname)
490        print('Private key: ' + str(arg.pathname))
491        print('Public key: ' + str(arg.pathname) + '.pub')
492
493    except IOError as error:
494        print(error)
495
496
497def _new_generate_key_parser(subparsers) -> None:
498    """Parser to handle key generation subcommand."""
499
500    generate_key_parser = subparsers.add_parser(
501        'generate-key',
502        description=(
503            'Generates an ecdsa-sha2-nistp256 signing key pair '
504            '(private + public)'
505        ),
506        help='',
507    )
508    generate_key_parser.set_defaults(func=generate_key_handler)
509    generate_key_parser.add_argument(
510        'pathname', type=Path, help='Path to generated key pair'
511    )
512
513
514def _parse_args() -> argparse.Namespace:
515    parser_root = argparse.ArgumentParser(
516        description='Software update related operations.',
517        epilog='Learn more at: pigweed.dev/pw_software_update',
518    )
519    parser_root.set_defaults(
520        func=lambda *_args, **_kwargs: parser_root.print_help()
521    )
522
523    subparsers = parser_root.add_subparsers()
524
525    # Key generation related parsers
526    _new_generate_key_parser(subparsers)
527
528    # Root metadata related parsers
529    _new_create_root_metadata_parser(subparsers)
530    _new_sign_root_metadata_parser(subparsers)
531    _new_inspect_root_metadata_parser(subparsers)
532
533    # Bundle related parsers
534    _new_create_empty_bundle_parser(subparsers)
535    _new_add_root_metadata_to_bundle_parser(subparsers)
536    _new_add_file_to_bundle_parser(subparsers)
537    _new_sign_bundle_parser(subparsers)
538    _new_inspect_bundle_parser(subparsers)
539
540    return parser_root.parse_args()
541
542
543def _dispatch_command(args) -> None:
544    args.func(args)
545
546
547def main() -> int:
548    """Software update command-line interface(WIP)."""
549    _dispatch_command(_parse_args())
550    return 0
551
552
553if __name__ == '__main__':
554    sys.exit(main())
555