• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""
15Software update related operations.
16
17Learn more at: pigweed.dev/pw_software_update
18
19"""
20
21from __future__ import annotations
22
23import argparse
24import os
25import sys
26from pathlib import Path
27
28from pw_software_update import (
29    dev_sign,
30    keys,
31    metadata,
32    root_metadata,
33    update_bundle,
34)
35from pw_software_update.tuf_pb2 import (
36    RootMetadata,
37    SignedRootMetadata,
38    TargetsMetadata,
39)
40from pw_software_update.update_bundle_pb2 import UpdateBundle
41
42
43def inspect_bundle_handler(arg) -> None:
44    """Prints bundle contents."""
45
46    try:
47        bundle = UpdateBundle.FromString(arg.pathname.read_bytes())
48        signed_targets_metadata = bundle.targets_metadata['targets']
49        targets_metadata = TargetsMetadata().FromString(
50            signed_targets_metadata.serialized_targets_metadata
51        )
52        print('Targets Metadata:')
53        print('=================')
54        print(targets_metadata)
55
56        print('\nTarget Files:')
57        print('=============')
58        for i, (name, contents) in enumerate(bundle.target_payloads.items()):
59            print(f'{i+1} of {len(bundle.target_payloads)}:')
60            print(f'  filename: {name}')
61            print(f'  length: {len(contents)}')
62
63            first_32_bytes = contents[:32]
64            print(f'  ascii contents(first 32 bytes): {first_32_bytes!r}')
65            print(f'  hex contents(first 32 bytes): {first_32_bytes.hex()}\n')
66
67        signed_root_metadata = bundle.root_metadata
68        deserialized_root_metadata = RootMetadata.FromString(
69            signed_root_metadata.serialized_root_metadata
70        )
71        print('\nRoot Metadata:')
72        print('==============')
73        print(deserialized_root_metadata)
74
75    except IOError as error:
76        print(error)
77
78
79def _new_inspect_bundle_parser(subparsers) -> None:
80    """Parser to handle inspect-bundle subcommand"""
81
82    formatter_class = lambda prog: argparse.HelpFormatter(
83        prog, max_help_position=100, width=200
84    )
85    inspect_bundle_parser = subparsers.add_parser(
86        'inspect-bundle',
87        description='Outputs contents of bundle',
88        formatter_class=formatter_class,
89        help="",
90    )
91
92    inspect_bundle_parser.set_defaults(func=inspect_bundle_handler)
93    inspect_bundle_parser.add_argument(
94        'pathname', type=Path, help='Path to bundle'
95    )
96
97
98def sign_bundle_handler(arg) -> None:
99    """Handles signing of a bundle"""
100
101    try:
102        signed_bundle = dev_sign.sign_update_bundle(
103            UpdateBundle.FromString(arg.bundle.read_bytes()),
104            arg.key.read_bytes(),
105        )
106        arg.bundle.write_bytes(signed_bundle.SerializeToString())
107
108    except IOError as error:
109        print(error)
110
111
112def _new_sign_bundle_parser(subparsers) -> None:
113    """Parser for sign-bundle subcommand"""
114
115    formatter_class = lambda prog: argparse.HelpFormatter(
116        prog, max_help_position=100, width=200
117    )
118    sign_bundle_parser = subparsers.add_parser(
119        'sign-bundle',
120        description='Sign an existing bundle using a development key',
121        formatter_class=formatter_class,
122        help="",
123    )
124
125    sign_bundle_parser.set_defaults(func=sign_bundle_handler)
126    required_arguments = sign_bundle_parser.add_argument_group(
127        'required arguments'
128    )
129    required_arguments.add_argument(
130        '--bundle',
131        help='Bundle to be signed',
132        metavar='BUNDLE',
133        required=True,
134        type=Path,
135    )
136    required_arguments.add_argument(
137        '--key',
138        help='Bundle signing key',
139        metavar='KEY',
140        required=True,
141        type=Path,
142    )
143
144
145def add_file_to_bundle(
146    bundle: UpdateBundle, file_name: str, file_contents: bytes
147) -> UpdateBundle:
148    """Adds a target file represented by file_name and file_contents to an
149    existing UpdateBundle -- bundle and returns the updated UpdateBundle object.
150    """
151
152    if not file_name in bundle.target_payloads:
153        bundle.target_payloads[file_name] = file_contents
154    else:
155        raise Exception(f'File name {file_name} already exists in bundle')
156
157    signed_targets_metadata = bundle.targets_metadata['targets']
158    targets_metadata = TargetsMetadata().FromString(
159        signed_targets_metadata.serialized_targets_metadata
160    )
161
162    matching_file_names = list(
163        filter(
164            lambda name: name.file_name == file_name,
165            targets_metadata.target_files,
166        )
167    )
168
169    target_file = metadata.gen_target_file(file_name, file_contents)
170
171    if not matching_file_names:
172        targets_metadata.target_files.append(target_file)
173    else:
174        raise Exception(f'File name {file_name} already exists in bundle')
175
176    bundle.targets_metadata[
177        'targets'
178    ].serialized_targets_metadata = targets_metadata.SerializeToString()
179
180    return bundle
181
182
183def add_file_to_bundle_handler(arg) -> None:
184    """Add a new file to an existing bundle. Updates the targets metadata
185    and errors out if the file already exists.
186    """
187
188    try:
189        if not arg.new_name:
190            file_name = os.path.splitext(os.path.basename(arg.file))[0]
191        else:
192            file_name = arg.new_name
193
194        bundle = UpdateBundle().FromString(arg.bundle.read_bytes())
195        updated_bundle = add_file_to_bundle(
196            bundle=bundle,
197            file_name=file_name,
198            file_contents=arg.file.read_bytes(),
199        )
200
201        arg.bundle.write_bytes(updated_bundle.SerializeToString())
202
203    except IOError as error:
204        print(error)
205
206
207def _new_add_file_to_bundle_parser(subparsers) -> None:
208    """Parser for adding file to bundle subcommand"""
209
210    formatter_class = lambda prog: argparse.HelpFormatter(
211        prog, max_help_position=100, width=200
212    )
213    add_file_to_bundle_parser = subparsers.add_parser(
214        'add-file-to-bundle',
215        description='Add a file to an existing bundle',
216        formatter_class=formatter_class,
217        help="",
218    )
219    add_file_to_bundle_parser.set_defaults(func=add_file_to_bundle_handler)
220    required_arguments = add_file_to_bundle_parser.add_argument_group(
221        'required arguments'
222    )
223    required_arguments.add_argument(
224        '--bundle',
225        help='Path to an existing bundle',
226        metavar='BUNDLE',
227        required=True,
228        type=Path,
229    )
230    required_arguments.add_argument(
231        '--file',
232        help='Path to a target file',
233        metavar='FILE_PATH',
234        required=True,
235        type=Path,
236    )
237    required_arguments.add_argument(
238        '--new-name',
239        help='Optional new name for target',
240        metavar='NEW_NAME',
241        required=False,
242        type=str,
243    )
244
245
246def add_root_metadata_to_bundle_handler(arg) -> None:
247    """Handles appending of root metadata to a bundle"""
248
249    try:
250        bundle = UpdateBundle().FromString(arg.bundle.read_bytes())
251        bundle.root_metadata.CopyFrom(
252            SignedRootMetadata().FromString(
253                arg.append_root_metadata.read_bytes()
254            )
255        )
256        arg.bundle.write_bytes(bundle.SerializeToString())
257
258    except IOError as error:
259        print(error)
260
261
262def _new_add_root_metadata_to_bundle_parser(subparsers) -> None:
263    """Parser for subcommand adding root metadata to bundle"""
264
265    formatter_class = lambda prog: argparse.HelpFormatter(
266        prog, max_help_position=100, width=200
267    )
268    add_root_metadata_to_bundle_parser = subparsers.add_parser(
269        'add-root-metadata-to-bundle',
270        description='Add root metadata to a bundle',
271        formatter_class=formatter_class,
272        help="",
273    )
274    add_root_metadata_to_bundle_parser.set_defaults(
275        func=add_root_metadata_to_bundle_handler
276    )
277    required_arguments = add_root_metadata_to_bundle_parser.add_argument_group(
278        'required arguments'
279    )
280    required_arguments.add_argument(
281        '--append-root-metadata',
282        help='Path to root metadata',
283        metavar='ROOT_METADATA',
284        required=True,
285        type=Path,
286    )
287    required_arguments.add_argument(
288        '--bundle',
289        help='Path to bundle',
290        metavar='BUNDLE',
291        required=True,
292        type=Path,
293    )
294
295
296def create_empty_bundle_handler(arg) -> None:
297    """Handles the creation of an empty bundle and writes it to disc."""
298
299    try:
300        bundle = update_bundle.gen_empty_update_bundle(
301            arg.target_metadata_version
302        )
303        arg.pathname.write_bytes(bundle.SerializeToString())
304
305    except IOError as error:
306        print(error)
307
308
309def _new_create_empty_bundle_parser(subparsers) -> None:
310    """Parser for creation of an empty bundle."""
311
312    formatter_class = lambda prog: argparse.HelpFormatter(
313        prog, max_help_position=100, width=200
314    )
315    create_empty_bundle_parser = subparsers.add_parser(
316        'create-empty-bundle',
317        description='Creation of an empty bundle',
318        formatter_class=formatter_class,
319        help="",
320    )
321    create_empty_bundle_parser.set_defaults(func=create_empty_bundle_handler)
322    create_empty_bundle_parser.add_argument(
323        'pathname', type=Path, help='Path to newly created empty bundle'
324    )
325    create_empty_bundle_parser.add_argument(
326        '--target-metadata-version',
327        help='Version number for targets metadata; Defaults to 1',
328        metavar='VERSION',
329        type=int,
330        default=1,
331        required=False,
332    )
333
334
335def inspect_root_metadata_handler(arg) -> None:
336    """Prints root metadata contents as defined by "RootMetadata" message
337    structure in tuf.proto as well as the number of identified signatures.
338    """
339
340    try:
341        signed_root_metadata = SignedRootMetadata.FromString(
342            arg.pathname.read_bytes()
343        )
344
345        deserialized_root_metadata = RootMetadata.FromString(
346            signed_root_metadata.serialized_root_metadata
347        )
348        print(deserialized_root_metadata)
349
350        print(
351            'Number of signatures found:', len(signed_root_metadata.signatures)
352        )
353
354    except IOError as error:
355        print(error)
356
357
358def _new_inspect_root_metadata_parser(subparsers) -> None:
359    """Parser to handle inspect-root-metadata subcommand"""
360
361    formatter_class = lambda prog: argparse.HelpFormatter(
362        prog, max_help_position=100, width=200
363    )
364    inspect_root_metadata_parser = subparsers.add_parser(
365        'inspect-root-metadata',
366        description='Outputs contents of root metadata',
367        formatter_class=formatter_class,
368        help="",
369    )
370
371    inspect_root_metadata_parser.set_defaults(
372        func=inspect_root_metadata_handler
373    )
374    inspect_root_metadata_parser.add_argument(
375        'pathname', type=Path, help='Path to root metadata'
376    )
377
378
379def sign_root_metadata_handler(arg) -> None:
380    """Handler for signing of root metadata"""
381
382    try:
383        signed_root_metadata = dev_sign.sign_root_metadata(
384            SignedRootMetadata.FromString(arg.root_metadata.read_bytes()),
385            arg.root_key.read_bytes(),
386        )
387        arg.root_metadata.write_bytes(signed_root_metadata.SerializeToString())
388
389    except IOError as error:
390        print(error)
391
392
393def _new_sign_root_metadata_parser(subparsers) -> None:
394    """Parser to handle sign-root-metadata subcommand"""
395
396    formatter_class = lambda prog: argparse.HelpFormatter(
397        prog, max_help_position=100, width=200
398    )
399    sign_root_metadata_parser = subparsers.add_parser(
400        'sign-root-metadata',
401        description='Signing of root metadata',
402        formatter_class=formatter_class,
403        help="",
404    )
405
406    sign_root_metadata_parser.set_defaults(func=sign_root_metadata_handler)
407    required_arguments = sign_root_metadata_parser.add_argument_group(
408        'required arguments'
409    )
410    required_arguments.add_argument(
411        '--root-metadata',
412        help='Root metadata to be signed',
413        metavar='ROOT_METADATA',
414        required=True,
415        type=Path,
416    )
417    required_arguments.add_argument(
418        '--root-key',
419        help='Root signing key',
420        metavar='ROOT_KEY',
421        required=True,
422        type=Path,
423    )
424
425
426def create_root_metadata_handler(arg) -> None:
427    """Handler function for creation of root metadata."""
428
429    try:
430        root_metadata.main(
431            arg.out, arg.append_root_key, arg.append_targets_key, arg.version
432        )
433
434        # TODO(eashansingh): Print message that allows user
435        # to visualize root metadata with
436        # `pw update inspect-root-metadata` command
437
438    except IOError as error:
439        print(error)
440
441
442def _new_create_root_metadata_parser(subparsers) -> None:
443    """Parser to handle create-root-metadata subcommand."""
444
445    formatter_class = lambda prog: argparse.HelpFormatter(
446        prog, max_help_position=100, width=200
447    )
448    create_root_metadata_parser = subparsers.add_parser(
449        'create-root-metadata',
450        description='Creation of root metadata',
451        formatter_class=formatter_class,
452        help='',
453    )
454    create_root_metadata_parser.set_defaults(func=create_root_metadata_handler)
455    create_root_metadata_parser.add_argument(
456        '--version',
457        help='Canonical version number for rollback checks; Defaults to 1',
458        type=int,
459        default=1,
460        required=False,
461    )
462
463    required_arguments = create_root_metadata_parser.add_argument_group(
464        'required arguments'
465    )
466    required_arguments.add_argument(
467        '--append-root-key',
468        help='Path to root key',
469        metavar='ROOT_KEY',
470        required=True,
471        action='append',
472        type=Path,
473    )
474    required_arguments.add_argument(
475        '--append-targets-key',
476        help='Path to targets key',
477        metavar='TARGETS_KEY',
478        required=True,
479        action='append',
480        type=Path,
481    )
482    required_arguments.add_argument(
483        '-o', '--out', help='Path to output file', required=True, type=Path
484    )
485
486
487def generate_key_handler(arg) -> None:
488    """Handler function for key generation"""
489
490    try:
491        keys.gen_ecdsa_keypair(arg.pathname)
492        print('Private key: ' + str(arg.pathname))
493        print('Public key: ' + str(arg.pathname) + '.pub')
494
495    except IOError as error:
496        print(error)
497
498
499def _new_generate_key_parser(subparsers) -> None:
500    """Parser to handle key generation subcommand."""
501
502    generate_key_parser = subparsers.add_parser(
503        'generate-key',
504        description=(
505            'Generates an ecdsa-sha2-nistp256 signing key pair '
506            '(private + public)'
507        ),
508        help='',
509    )
510    generate_key_parser.set_defaults(func=generate_key_handler)
511    generate_key_parser.add_argument(
512        'pathname', type=Path, help='Path to generated key pair'
513    )
514
515
516def _parse_args() -> argparse.Namespace:
517    parser_root = argparse.ArgumentParser(
518        description='Software update related operations.',
519        epilog='Learn more at: pigweed.dev/pw_software_update',
520    )
521    parser_root.set_defaults(
522        func=lambda *_args, **_kwargs: parser_root.print_help()
523    )
524
525    subparsers = parser_root.add_subparsers()
526
527    # Key generation related parsers
528    _new_generate_key_parser(subparsers)
529
530    # Root metadata related parsers
531    _new_create_root_metadata_parser(subparsers)
532    _new_sign_root_metadata_parser(subparsers)
533    _new_inspect_root_metadata_parser(subparsers)
534
535    # Bundle related parsers
536    _new_create_empty_bundle_parser(subparsers)
537    _new_add_root_metadata_to_bundle_parser(subparsers)
538    _new_add_file_to_bundle_parser(subparsers)
539    _new_sign_bundle_parser(subparsers)
540    _new_inspect_bundle_parser(subparsers)
541
542    return parser_root.parse_args()
543
544
545def _dispatch_command(args) -> None:
546    args.func(args)
547
548
549def main() -> int:
550    """Software update command-line interface(WIP)."""
551    _dispatch_command(_parse_args())
552    return 0
553
554
555if __name__ == '__main__':
556    sys.exit(main())
557