1#!/usr/bin/env python 2# 3# Copyright 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17"""Helper tool for performing an authenticated AVB unlock of an Android Things device. 18 19This tool communicates with an Android Things device over fastboot to perform an 20authenticated AVB unlock. The user provides unlock credentials valid for the 21device they want to unlock, likely obtained from the Android Things Developer 22Console. The tool handles the sequence of fastboot commands to complete the 23challenge-response unlock protocol. 24 25Unlock credentials can be provided to the tool in one of two ways: 26 27 1) by providing paths to the individual credential files using the 28 '--pik_cert', '--puk_cert', and '--puk' command line swtiches, or 29 30 2) by providing a path to a zip archive containing the three credential files, 31 named as follows: 32 - Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin' 33 - Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin' 34 - PUK private key: 'puk.*\.pem' 35 36 You can also provide one or more archives and/or one or more directories 37 containing such zip archives. In either scenario, the tool will search all 38 of the provided credential archives for a match against the product ID of 39 the device being unlocked and automatically use the first match. 40 41This tool also clears the factory partition persistent digest unless the 42--clear_factory_digest=false option is used. There is no harm to clear this 43digest even if changes to the factory partition are not planned. 44 45Dependencies: 46 - Python 2.7.x, 3.2.x, or newer (for argparse) 47 - PyCrypto 2.5 or newer (for PKCS1_v1_5 and RSA PKCS#8 PEM key import) 48 - Android SDK Platform Tools (for fastboot), in PATH 49 - https://developer.android.com/studio/releases/platform-tools 50""" 51 52HELP_DESCRIPTION = """Performs an authenticated AVB unlock of an Android Things device over 53fastboot, given valid unlock credentials for the device.""" 54 55HELP_USAGE = """ 56 %(prog)s [-h] [-v] [-s SERIAL] [--clear_factory_digest=true|false] unlock_creds.zip [unlock_creds_2.zip ...] 57 %(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem""" 58 59HELP_EPILOG = """examples: 60 %(prog)s unlock_creds.zip 61 %(prog)s unlock_creds.zip unlock_creds_2.zip -s SERIAL 62 %(prog)s path_to_dir_with_multiple_unlock_creds/ 63 %(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem""" 64 65import sys 66 67ver = sys.version_info 68if (ver[0] < 2) or (ver[0] == 2 and ver[1] < 7) or (ver[0] == 3 and ver[1] < 2): 69 print('This script requires Python 2.7+ or 3.2+') 70 sys.exit(1) 71 72import argparse 73import binascii 74import os 75import re 76import shutil 77import struct 78import subprocess 79import tempfile 80import zipfile 81 82# Requires PyCrypto 2.5 (or newer) for PKCS1_v1_5 and support for importing 83# PEM-encoded RSA keys 84try: 85 from Crypto.Hash import SHA512 86 from Crypto.PublicKey import RSA 87 from Crypto.Signature import PKCS1_v1_5 88except ImportError as e: 89 print('PyCrypto 2.5 or newer required, missing or too old: ' + str(e)) 90 91 92class UnlockCredentials(object): 93 """Helper data container class for the 3 unlock credentials involved in an AVB authenticated unlock operation. 94 95 """ 96 97 def __init__(self, 98 intermediate_cert_file, 99 unlock_cert_file, 100 unlock_key_file, 101 source_file=None): 102 # The certificates are AvbAtxCertificate structs as defined in libavb_atx, 103 # not an X.509 certificate. Do a basic length sanity check when reading 104 # them. 105 EXPECTED_CERTIFICATE_SIZE = 1620 106 107 with open(intermediate_cert_file, 'rb') as f: 108 self._intermediate_cert = f.read() 109 if len(self._intermediate_cert) != EXPECTED_CERTIFICATE_SIZE: 110 raise ValueError('Invalid intermediate key certificate length.') 111 112 with open(unlock_cert_file, 'rb') as f: 113 self._unlock_cert = f.read() 114 if len(self._unlock_cert) != EXPECTED_CERTIFICATE_SIZE: 115 raise ValueError('Invalid product unlock key certificate length.') 116 117 with open(unlock_key_file, 'rb') as f: 118 self._unlock_key = RSA.importKey(f.read()) 119 if not self._unlock_key.has_private(): 120 raise ValueError('Unlock key was not an RSA private key.') 121 122 self._source_file = source_file 123 124 @property 125 def intermediate_cert(self): 126 return self._intermediate_cert 127 128 @property 129 def unlock_cert(self): 130 return self._unlock_cert 131 132 @property 133 def unlock_key(self): 134 return self._unlock_key 135 136 @property 137 def source_file(self): 138 return self._source_file 139 140 @classmethod 141 def from_credential_archive(cls, archive): 142 """Create UnlockCredentials from an unlock credential zip archive. 143 144 The zip archive must contain the following three credential files, named as 145 follows: 146 - Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin' 147 - Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin' 148 - PUK private key: 'puk.*\.pem' 149 150 This uses @contextlib.contextmanager so we can clean up the tempdir created 151 to unpack the zip contents into. 152 153 Arguments: 154 - archive: Filename of zip archive containing unlock credentials. 155 156 Raises: 157 ValueError: If archive is either missing a required file or contains 158 multiple files matching one of the filename formats. 159 """ 160 161 def _find_one_match(contents, regex, desc): 162 r = re.compile(regex) 163 matches = list(filter(r.search, contents)) 164 if not matches: 165 raise ValueError( 166 "Couldn't find {} file (matching regex '{}') in archive {}".format( 167 desc, regex, archive)) 168 elif len(matches) > 1: 169 raise ValueError( 170 "Found multiple files for {} (matching regex '{}') in archive {}" 171 .format(desc, regex, archive)) 172 return matches[0] 173 174 tempdir = tempfile.mkdtemp() 175 try: 176 with zipfile.ZipFile(archive, mode='r') as zip: 177 contents = zip.namelist() 178 179 pik_cert_re = r'^pik_certificate.*\.bin$' 180 pik_cert = _find_one_match(contents, pik_cert_re, 181 'intermediate key (PIK) certificate') 182 183 puk_cert_re = r'^puk_certificate.*\.bin$' 184 puk_cert = _find_one_match(contents, puk_cert_re, 185 'unlock key (PUK) certificate') 186 187 puk_re = r'^puk.*\.pem$' 188 puk = _find_one_match(contents, puk_re, 'unlock key (PUK)') 189 190 zip.extractall(path=tempdir, members=[pik_cert, puk_cert, puk]) 191 192 return cls( 193 intermediate_cert_file=os.path.join(tempdir, pik_cert), 194 unlock_cert_file=os.path.join(tempdir, puk_cert), 195 unlock_key_file=os.path.join(tempdir, puk), 196 source_file=archive) 197 finally: 198 shutil.rmtree(tempdir) 199 200 201class UnlockChallenge(object): 202 """Helper class for parsing the AvbAtxUnlockChallenge struct returned from 'fastboot oem at-get-vboot-unlock-challenge'. 203 204 The file provided to the constructor should be the full 52-byte 205 AvbAtxUnlockChallenge struct, not just the challenge itself. 206 """ 207 208 def __init__(self, challenge_file): 209 CHALLENGE_STRUCT_SIZE = 52 210 PRODUCT_ID_HASH_SIZE = 32 211 CHALLENGE_DATA_SIZE = 16 212 with open(challenge_file, 'rb') as f: 213 data = f.read() 214 if len(data) != CHALLENGE_STRUCT_SIZE: 215 raise ValueError('Invalid unlock challenge length.') 216 217 self._version, self._product_id_hash, self._challenge_data = struct.unpack( 218 '<I{}s{}s'.format(PRODUCT_ID_HASH_SIZE, CHALLENGE_DATA_SIZE), data) 219 220 @property 221 def version(self): 222 return self._version 223 224 @property 225 def product_id_hash(self): 226 return self._product_id_hash 227 228 @property 229 def challenge_data(self): 230 return self._challenge_data 231 232 233def GetAtxCertificateSubject(cert): 234 """Parses and returns the subject field from the given AvbAtxCertificate struct.""" 235 CERT_SUBJECT_OFFSET = 4 + 1032 # Format version and public key come before subject 236 CERT_SUBJECT_LENGTH = 32 237 return cert[CERT_SUBJECT_OFFSET:CERT_SUBJECT_OFFSET + CERT_SUBJECT_LENGTH] 238 239 240def SelectMatchingUnlockCredential(all_creds, challenge): 241 """Find and return the first UnlockCredentials object whose product ID matches that of the unlock challenge. 242 243 The Product Unlock Key (PUK) certificate's subject field contains the 244 SHA256 hash of the product ID that it can be used to unlock. This same 245 value (SHA256 hash of the product ID) is contained in the unlock challenge. 246 247 Arguments: 248 all_creds: List of UnlockCredentials objects to be searched for a match 249 against the given challenge. 250 challenge: UnlockChallenge object created from challenge obtained via 251 'fastboot oem at-get-vboot-unlock-challenge'. 252 """ 253 for creds in all_creds: 254 if GetAtxCertificateSubject(creds.unlock_cert) == challenge.product_id_hash: 255 return creds 256 257 258def MakeAtxUnlockCredential(creds, challenge, out_file): 259 """Simple reimplementation of 'avbtool make_atx_unlock_credential'. 260 261 Generates an Android Things authenticated unlock credential to authorize 262 unlocking AVB on a device. 263 264 This is reimplemented locally for simplicity, which avoids the need to bundle 265 this tool with the full avbtool. avbtool also uses openssl by default whereas 266 this uses PyCrypto, which makes it easier to support Windows since there are 267 no officially supported openssl binary distributions. 268 269 Arguments: 270 creds: UnlockCredentials object wrapping the PIK certificate, PUK 271 certificate, and PUK private key. 272 challenge: UnlockChallenge object created from challenge obtained via 273 'fastboot oem at-get-vboot-unlock-challenge'. 274 out_file: Output filename to write the AvbAtxUnlockCredential struct to. 275 276 Raises: 277 ValueError: If challenge has wrong length. 278 """ 279 hash = SHA512.new(challenge.challenge_data) 280 signer = PKCS1_v1_5.new(creds.unlock_key) 281 signature = signer.sign(hash) 282 283 with open(out_file, 'wb') as out: 284 out.write(struct.pack('<I', 1)) # Format Version 285 out.write(creds.intermediate_cert) 286 out.write(creds.unlock_cert) 287 out.write(signature) 288 289 290def AuthenticatedUnlock(all_creds, serial=None, verbose=False): 291 """Performs an authenticated AVB unlock of a device over fastboot. 292 293 Arguments: 294 all_creds: List of UnlockCredentials objects wrapping the PIK certificate, 295 PUK certificate, and PUK private key. The list will be searched to find 296 matching credentials for the device being unlocked. 297 serial: [optional] A device serial number or other valid value to be passed 298 to fastboot's '-s' switch to select the device to unlock. 299 verbose: [optional] Enable verbose output, which prints the fastboot 300 commands and their output as the commands are run. 301 """ 302 303 tempdir = tempfile.mkdtemp() 304 try: 305 challenge_file = os.path.join(tempdir, 'challenge') 306 credential_file = os.path.join(tempdir, 'credential') 307 308 def fastboot_cmd(args): 309 args = ['fastboot'] + (['-s', serial] if serial else []) + args 310 if verbose: 311 print('\n$ ' + ' '.join(args)) 312 313 out = subprocess.check_output( 314 args, stderr=subprocess.STDOUT).decode('utf-8') 315 316 if verbose: 317 print(out) 318 return out 319 320 try: 321 fastboot_cmd(['oem', 'at-get-vboot-unlock-challenge']) 322 fastboot_cmd(['get_staged', challenge_file]) 323 324 challenge = UnlockChallenge(challenge_file) 325 print('Product ID SHA256 hash = {}'.format( 326 binascii.hexlify(challenge.product_id_hash))) 327 328 selected_cred = SelectMatchingUnlockCredential(all_creds, challenge) 329 if not selected_cred: 330 print( 331 'ERROR: None of the provided unlock credentials match this device.') 332 return False 333 if selected_cred.source_file: 334 print('Found matching unlock credentials: {}'.format( 335 selected_cred.source_file)) 336 MakeAtxUnlockCredential(selected_cred, challenge, credential_file) 337 338 fastboot_cmd(['stage', credential_file]) 339 fastboot_cmd(['oem', 'at-unlock-vboot']) 340 341 res = fastboot_cmd(['getvar', 'at-vboot-state']) 342 if re.search(r'avb-locked(:\s*|=)0', res) is not None: 343 print('Device successfully AVB unlocked') 344 return True 345 else: 346 print('ERROR: Commands succeeded but device still locked') 347 return False 348 except subprocess.CalledProcessError as e: 349 print(e.output.decode('utf-8')) 350 print("Command '{}' returned non-zero exit status {}".format( 351 ' '.join(e.cmd), e.returncode)) 352 return False 353 finally: 354 shutil.rmtree(tempdir) 355 356 357def FindUnlockCredentialsInDirectory(dir, verbose=False): 358 if not os.path.isdir(dir): 359 raise ValueError('Not a directory: ' + dir) 360 361 creds = [] 362 for file in os.listdir(dir): 363 path = os.path.join(dir, file) 364 if os.path.isfile(path): 365 try: 366 creds.append(UnlockCredentials.from_credential_archive(path)) 367 if verbose: 368 print('Found valid unlock credential bundle: ' + path) 369 except (IOError, ValueError, zipfile.BadZipfile) as e: 370 if verbose: 371 print( 372 "Ignoring file which isn't a valid unlock credential zip bundle: " 373 + path) 374 return creds 375 376 377def ClearFactoryPersistentDigest(serial=None, verbose=False): 378 """Clears the factory partition persistent digest using fastboot. 379 380 Most of the time this should be cleared when unlocking a device because 381 otherwise any attempts to update the factory partition will be rejected once 382 the device is again locked, causing confusion. There is no harm to clear this 383 digest even if factory partition updates are not planned. 384 385 Arguments: 386 serial: [optional] A device serial number or other valid value to be passed 387 to fastboot's '-s' switch to select the device to unlock. 388 verbose: [optional] Enable verbose output, which prints the fastboot 389 commands and their output as the commands are run. 390 """ 391 FACTORY_PERSISTENT_DIGEST_NAME = 'avb.persistent_digest.factory' 392 393 tempdir = tempfile.mkdtemp() 394 try: 395 digest_data = os.path.join(tempdir, 'digest_data') 396 397 with open(digest_data, 'wb') as out: 398 out.write(struct.pack('<I', len(FACTORY_PERSISTENT_DIGEST_NAME))) 399 out.write(FACTORY_PERSISTENT_DIGEST_NAME) 400 # Sending a zero length digest will clear the existing digest. 401 out.write(struct.pack('<I', 0)) 402 403 def fastboot_cmd(args): 404 args = ['fastboot'] + (['-s', serial] if serial else []) + args 405 if verbose: 406 print('$ ' + ' '.join(args)) 407 408 out = subprocess.check_output( 409 args, stderr=subprocess.STDOUT).decode('utf-8') 410 411 if verbose: 412 print(out) 413 414 try: 415 fastboot_cmd(['stage', digest_data]) 416 fastboot_cmd(['oem', 'at-write-persistent-digest']) 417 print("Successfully cleared the factory partition persistent digest.") 418 return True 419 except subprocess.CalledProcessError as e: 420 print(e.output.decode('utf-8')) 421 print("Command '{}' returned non-zero exit status {}".format( 422 ' '.join(e.cmd), e.returncode)) 423 print("Warning: Failed to clear factory partition persistent digest.") 424 return False 425 426 finally: 427 shutil.rmtree(tempdir) 428 429 430def parse_boolean(value): 431 if value.strip().lower() in ('true', 't', 'yes', 'y', 'on', '1'): 432 return True 433 elif value.strip().lower() in ('false', 'f', 'no', 'n', 'off', '0'): 434 return False 435 else: 436 raise argparse.ArgumentTypeError('Unexpected boolean value: %s' % value) 437 438def main(in_args): 439 parser = argparse.ArgumentParser( 440 description=HELP_DESCRIPTION, 441 usage=HELP_USAGE, 442 epilog=HELP_EPILOG, 443 formatter_class=argparse.RawDescriptionHelpFormatter) 444 445 # General optional arguments. 446 parser.add_argument( 447 '-v', 448 '--verbose', 449 action='store_true', 450 help= 451 'enable verbose output, e.g. prints fastboot commands and their output') 452 parser.add_argument( 453 '-s', 454 '--serial', 455 help= 456 "specify device to unlock, either by serial or any other valid value for fastboot's -s arg" 457 ) 458 parser.add_argument( 459 '--clear_factory_digest', 460 nargs='?', 461 type=parse_boolean, 462 default='true', 463 const='true', 464 help='Defaults to true. Set to false to prevent clearing the factory persistent digest') 465 466 # User must provide either a unlock credential bundle, or the individual files 467 # normally contained in such a bundle. 468 # argparse doesn't support specifying this argument format - two groups of 469 # mutually exclusive arguments, where one group requires all arguments in that 470 # group to be specified - so we define them as optional arguments and do the 471 # validation ourselves below. 472 473 # Argument group #1 - Unlock credential zip archive(s) (or directory 474 # containing multiple such archives) 475 parser.add_argument( 476 'bundle', 477 metavar='unlock_creds.zip', 478 nargs='*', 479 help= 480 'Unlock using a zip bundle/archive of credentials (e.g. from Developer ' 481 'Console). You can optionally provide multiple archives and/or a ' 482 'directory of such bundles and the tool will automatically select the ' 483 'correct one to use based on matching the product ID against the device ' 484 'being unlocked.') 485 486 # Argument group #2 - Individual credential files 487 parser.add_argument( 488 '--pik_cert', 489 metavar='pik_cert.bin', 490 help='Path to product intermediate key (PIK) certificate file') 491 parser.add_argument( 492 '--puk_cert', 493 metavar='puk_cert.bin', 494 help='Path to product unlock key (PUK) certificate file') 495 parser.add_argument( 496 '--puk', 497 metavar='puk.pem', 498 help='Path to product unlock key in PEM format') 499 500 # Print help if no args given 501 args = parser.parse_args(in_args if in_args else ['-h']) 502 503 # Do the custom validation described above. 504 if args.pik_cert is not None or args.puk_cert is not None or args.puk is not None: 505 # Check mutual exclusion with bundle positional argument 506 if len(args.bundle): 507 parser.error( 508 'bundle argument is mutually exclusive with --pik_cert, --puk_cert, and --puk' 509 ) 510 511 # Check for 'mutual inclusion' of individual file options 512 if args.pik_cert is None: 513 parser.error("--pik_cert is required if --puk_cert or --puk' is given") 514 if args.puk_cert is None: 515 parser.error("--puk_cert is required if --pik_cert or --puk' is given") 516 if args.puk is None: 517 parser.error("--puk is required if --pik_cert or --puk_cert' is given") 518 elif not len(args.bundle): 519 parser.error( 520 'must provide either credentials bundle or individual credential files') 521 522 # Parse arguments into UnlockCredentials objects 523 if len(args.bundle): 524 creds = [] 525 for path in args.bundle: 526 if os.path.isfile(path): 527 creds.append(UnlockCredentials.from_credential_archive(path)) 528 elif os.path.isdir(path): 529 creds.extend( 530 FindUnlockCredentialsInDirectory(path, verbose=args.verbose)) 531 else: 532 parser.error("path argument '{}' does not exist".format(path)) 533 534 if len(creds) == 0: 535 parser.error('No unlock credentials were found in any of the given paths') 536 else: 537 creds = [UnlockCredentials(args.pik_cert, args.puk_cert, args.puk)] 538 539 ret = AuthenticatedUnlock(creds, serial=args.serial, verbose=args.verbose) 540 if ret and args.clear_factory_digest: 541 ret = ClearFactoryPersistentDigest(serial=args.serial, verbose=args.verbose) 542 return 0 if ret else 1 543 544 545if __name__ == '__main__': 546 sys.exit(main(sys.argv[1:])) 547