1#!/usr/bin/env python 2# 3# Copyright 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17"""Helper tool for performing an authenticated AVB unlock of an Android Things device. 18 19This tool communicates with an Android Things device over fastboot to perform an 20authenticated AVB unlock. The user provides unlock credentials valid for the 21device they want to unlock, likely obtained from the Android Things Developer 22Console. The tool handles the sequence of fastboot commands to complete the 23challenge-response unlock protocol. 24 25Unlock credentials can be provided to the tool in one of two ways: 26 27 1) by providing paths to the individual credential files using the 28 '--pik_cert', '--puk_cert', and '--puk' command line swtiches, or 29 30 2) by providing a path to a zip archive containing the three credential files, 31 named as follows: 32 - Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin' 33 - Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin' 34 - PUK private key: 'puk.*\.pem' 35 36 You can also provide one or more archives and/or one or more directories 37 containing such zip archives. In either scenario, the tool will search all 38 of the provided credential archives for a match against the product ID of 39 the device being unlocked and automatically use the first match. 40 41Dependencies: 42 - Python 2.7.x, 3.2.x, or newer (for argparse) 43 - PyCrypto 2.5 or newer (for PKCS1_v1_5 and RSA PKCS#8 PEM key import) 44 - Android SDK Platform Tools (for fastboot), in PATH 45 - https://developer.android.com/studio/releases/platform-tools 46""" 47 48HELP_DESCRIPTION = """Performs an authenticated AVB unlock of an Android Things device over 49fastboot, given valid unlock credentials for the device.""" 50 51HELP_USAGE = """ 52 %(prog)s [-h] [-v] [-s SERIAL] unlock_creds.zip [unlock_creds_2.zip ...] 53 %(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem""" 54 55HELP_EPILOG = """examples: 56 %(prog)s unlock_creds.zip 57 %(prog)s unlock_creds.zip unlock_creds_2.zip -s SERIAL 58 %(prog)s path_to_dir_with_multiple_unlock_creds/ 59 %(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem""" 60 61import sys 62 63ver = sys.version_info 64if (ver[0] < 2) or (ver[0] == 2 and ver[1] < 7) or (ver[0] == 3 and ver[1] < 2): 65 print('This script requires Python 2.7+ or 3.2+') 66 sys.exit(1) 67 68import argparse 69import binascii 70import os 71import re 72import shutil 73import struct 74import subprocess 75import tempfile 76import zipfile 77 78# Requires PyCrypto 2.5 (or newer) for PKCS1_v1_5 and support for importing 79# PEM-encoded RSA keys 80try: 81 from Crypto.Hash import SHA512 82 from Crypto.PublicKey import RSA 83 from Crypto.Signature import PKCS1_v1_5 84except ImportError as e: 85 print('PyCrypto 2.5 or newer required, missing or too old: ' + str(e)) 86 87 88class UnlockCredentials(object): 89 """Helper data container class for the 3 unlock credentials involved in an AVB authenticated unlock operation. 90 91 """ 92 93 def __init__(self, 94 intermediate_cert_file, 95 unlock_cert_file, 96 unlock_key_file, 97 source_file=None): 98 # The certificates are AvbAtxCertificate structs as defined in libavb_atx, 99 # not an X.509 certificate. Do a basic length sanity check when reading 100 # them. 101 EXPECTED_CERTIFICATE_SIZE = 1620 102 103 with open(intermediate_cert_file, 'rb') as f: 104 self._intermediate_cert = f.read() 105 if len(self._intermediate_cert) != EXPECTED_CERTIFICATE_SIZE: 106 raise ValueError('Invalid intermediate key certificate length.') 107 108 with open(unlock_cert_file, 'rb') as f: 109 self._unlock_cert = f.read() 110 if len(self._unlock_cert) != EXPECTED_CERTIFICATE_SIZE: 111 raise ValueError('Invalid product unlock key certificate length.') 112 113 with open(unlock_key_file, 'rb') as f: 114 self._unlock_key = RSA.importKey(f.read()) 115 if not self._unlock_key.has_private(): 116 raise ValueError('Unlock key was not an RSA private key.') 117 118 self._source_file = source_file 119 120 @property 121 def intermediate_cert(self): 122 return self._intermediate_cert 123 124 @property 125 def unlock_cert(self): 126 return self._unlock_cert 127 128 @property 129 def unlock_key(self): 130 return self._unlock_key 131 132 @property 133 def source_file(self): 134 return self._source_file 135 136 @classmethod 137 def from_credential_archive(cls, archive): 138 """Create UnlockCredentials from an unlock credential zip archive. 139 140 The zip archive must contain the following three credential files, named as 141 follows: 142 - Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin' 143 - Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin' 144 - PUK private key: 'puk.*\.pem' 145 146 This uses @contextlib.contextmanager so we can clean up the tempdir created 147 to unpack the zip contents into. 148 149 Arguments: 150 - archive: Filename of zip archive containing unlock credentials. 151 152 Raises: 153 ValueError: If archive is either missing a required file or contains 154 multiple files matching one of the filename formats. 155 """ 156 157 def _find_one_match(contents, regex, desc): 158 r = re.compile(regex) 159 matches = list(filter(r.search, contents)) 160 if not matches: 161 raise ValueError( 162 "Couldn't find {} file (matching regex '{}') in archive {}".format( 163 desc, regex, archive)) 164 elif len(matches) > 1: 165 raise ValueError( 166 "Found multiple files for {} (matching regex '{}') in archive {}" 167 .format(desc, regex, archive)) 168 return matches[0] 169 170 tempdir = tempfile.mkdtemp() 171 try: 172 with zipfile.ZipFile(archive, mode='r') as zip: 173 contents = zip.namelist() 174 175 pik_cert_re = r'^pik_certificate.*\.bin$' 176 pik_cert = _find_one_match(contents, pik_cert_re, 177 'intermediate key (PIK) certificate') 178 179 puk_cert_re = r'^puk_certificate.*\.bin$' 180 puk_cert = _find_one_match(contents, puk_cert_re, 181 'unlock key (PUK) certificate') 182 183 puk_re = r'^puk.*\.pem$' 184 puk = _find_one_match(contents, puk_re, 'unlock key (PUK)') 185 186 zip.extractall(path=tempdir, members=[pik_cert, puk_cert, puk]) 187 188 return cls( 189 intermediate_cert_file=os.path.join(tempdir, pik_cert), 190 unlock_cert_file=os.path.join(tempdir, puk_cert), 191 unlock_key_file=os.path.join(tempdir, puk), 192 source_file=archive) 193 finally: 194 shutil.rmtree(tempdir) 195 196 197class UnlockChallenge(object): 198 """Helper class for parsing the AvbAtxUnlockChallenge struct returned from 'fastboot oem at-get-vboot-unlock-challenge'. 199 200 The file provided to the constructor should be the full 52-byte 201 AvbAtxUnlockChallenge struct, not just the challenge itself. 202 """ 203 204 def __init__(self, challenge_file): 205 CHALLENGE_STRUCT_SIZE = 52 206 PRODUCT_ID_HASH_SIZE = 32 207 CHALLENGE_DATA_SIZE = 16 208 with open(challenge_file, 'rb') as f: 209 data = f.read() 210 if len(data) != CHALLENGE_STRUCT_SIZE: 211 raise ValueError('Invalid unlock challenge length.') 212 213 self._version, self._product_id_hash, self._challenge_data = struct.unpack( 214 '<I{}s{}s'.format(PRODUCT_ID_HASH_SIZE, CHALLENGE_DATA_SIZE), data) 215 216 @property 217 def version(self): 218 return self._version 219 220 @property 221 def product_id_hash(self): 222 return self._product_id_hash 223 224 @property 225 def challenge_data(self): 226 return self._challenge_data 227 228 229def GetAtxCertificateSubject(cert): 230 """Parses and returns the subject field from the given AvbAtxCertificate struct.""" 231 CERT_SUBJECT_OFFSET = 4 + 1032 # Format version and public key come before subject 232 CERT_SUBJECT_LENGTH = 32 233 return cert[CERT_SUBJECT_OFFSET:CERT_SUBJECT_OFFSET + CERT_SUBJECT_LENGTH] 234 235 236def SelectMatchingUnlockCredential(all_creds, challenge): 237 """Find and return the first UnlockCredentials object whose product ID matches that of the unlock challenge. 238 239 The Product Unlock Key (PUK) certificate's subject field contains the 240 SHA256 hash of the product ID that it can be used to unlock. This same 241 value (SHA256 hash of the product ID) is contained in the unlock challenge. 242 243 Arguments: 244 all_creds: List of UnlockCredentials objects to be searched for a match 245 against the given challenge. 246 challenge: UnlockChallenge object created from challenge obtained via 247 'fastboot oem at-get-vboot-unlock-challenge'. 248 """ 249 for creds in all_creds: 250 if GetAtxCertificateSubject(creds.unlock_cert) == challenge.product_id_hash: 251 return creds 252 253 254def MakeAtxUnlockCredential(creds, challenge, out_file): 255 """Simple reimplementation of 'avbtool make_atx_unlock_credential'. 256 257 Generates an Android Things authenticated unlock credential to authorize 258 unlocking AVB on a device. 259 260 This is reimplemented locally for simplicity, which avoids the need to bundle 261 this tool with the full avbtool. avbtool also uses openssl by default whereas 262 this uses PyCrypto, which makes it easier to support Windows since there are 263 no officially supported openssl binary distributions. 264 265 Arguments: 266 creds: UnlockCredentials object wrapping the PIK certificate, PUK 267 certificate, and PUK private key. 268 challenge: UnlockChallenge object created from challenge obtained via 269 'fastboot oem at-get-vboot-unlock-challenge'. 270 out_file: Output filename to write the AvbAtxUnlockCredential struct to. 271 272 Raises: 273 ValueError: If challenge has wrong length. 274 """ 275 hash = SHA512.new(challenge.challenge_data) 276 signer = PKCS1_v1_5.new(creds.unlock_key) 277 signature = signer.sign(hash) 278 279 with open(out_file, 'wb') as out: 280 out.write(struct.pack('<I', 1)) # Format Version 281 out.write(creds.intermediate_cert) 282 out.write(creds.unlock_cert) 283 out.write(signature) 284 285 286def AuthenticatedUnlock(all_creds, serial=None, verbose=False): 287 """Performs an authenticated AVB unlock of a device over fastboot. 288 289 Arguments: 290 all_creds: List of UnlockCredentials objects wrapping the PIK certificate, 291 PUK certificate, and PUK private key. The list will be searched to find 292 matching credentials for the device being unlocked. 293 serial: [optional] A device serial number or other valid value to be passed 294 to fastboot's '-s' switch to select the device to unlock. 295 verbose: [optional] Enable verbose output, which prints the fastboot 296 commands and their output as the commands are run. 297 """ 298 299 tempdir = tempfile.mkdtemp() 300 try: 301 challenge_file = os.path.join(tempdir, 'challenge') 302 credential_file = os.path.join(tempdir, 'credential') 303 304 def fastboot_cmd(args): 305 args = ['fastboot'] + (['-s', serial] if serial else []) + args 306 if verbose: 307 print('\n$ ' + ' '.join(args)) 308 309 out = subprocess.check_output( 310 args, stderr=subprocess.STDOUT).decode('utf-8') 311 312 if verbose: 313 print(out) 314 return out 315 316 try: 317 fastboot_cmd(['oem', 'at-get-vboot-unlock-challenge']) 318 fastboot_cmd(['get_staged', challenge_file]) 319 320 challenge = UnlockChallenge(challenge_file) 321 print('Product ID SHA256 hash = {}'.format( 322 binascii.hexlify(challenge.product_id_hash))) 323 324 selected_cred = SelectMatchingUnlockCredential(all_creds, challenge) 325 if not selected_cred: 326 print( 327 'ERROR: None of the provided unlock credentials match this device.') 328 return False 329 if selected_cred.source_file: 330 print('Found matching unlock credentials: {}'.format( 331 selected_cred.source_file)) 332 MakeAtxUnlockCredential(selected_cred, challenge, credential_file) 333 334 fastboot_cmd(['stage', credential_file]) 335 fastboot_cmd(['oem', 'at-unlock-vboot']) 336 337 res = fastboot_cmd(['getvar', 'at-vboot-state']) 338 if re.search(r'avb-locked(:\s*|=)0', res) is not None: 339 print('Device successfully AVB unlocked') 340 return True 341 else: 342 print('ERROR: Commands succeeded but device still locked') 343 return False 344 except subprocess.CalledProcessError as e: 345 print(e.output.decode('utf-8')) 346 print("Command '{}' returned non-zero exit status {}".format( 347 ' '.join(e.cmd), e.returncode)) 348 return False 349 finally: 350 shutil.rmtree(tempdir) 351 352 353def FindUnlockCredentialsInDirectory(dir, verbose=False): 354 if not os.path.isdir(dir): 355 raise ValueError('Not a directory: ' + dir) 356 357 creds = [] 358 for file in os.listdir(dir): 359 path = os.path.join(dir, file) 360 if os.path.isfile(path): 361 try: 362 creds.append(UnlockCredentials.from_credential_archive(path)) 363 if verbose: 364 print('Found valid unlock credential bundle: ' + path) 365 except (IOError, ValueError, zipfile.BadZipfile) as e: 366 if verbose: 367 print( 368 "Ignoring file which isn't a valid unlock credential zip bundle: " 369 + path) 370 return creds 371 372 373def main(in_args): 374 parser = argparse.ArgumentParser( 375 description=HELP_DESCRIPTION, 376 usage=HELP_USAGE, 377 epilog=HELP_EPILOG, 378 formatter_class=argparse.RawDescriptionHelpFormatter) 379 380 # General optional arguments. 381 parser.add_argument( 382 '-v', 383 '--verbose', 384 action='store_true', 385 help= 386 'enable verbose output, e.g. prints fastboot commands and their output') 387 parser.add_argument( 388 '-s', 389 '--serial', 390 help= 391 "specify device to unlock, either by serial or any other valid value for fastboot's -s arg" 392 ) 393 394 # User must provide either a unlock credential bundle, or the individual files 395 # normally contained in such a bundle. 396 # argparse doesn't support specifying this argument format - two groups of 397 # mutually exclusive arguments, where one group requires all arguments in that 398 # group to be specified - so we define them as optional arguments and do the 399 # validation ourselves below. 400 401 # Argument group #1 - Unlock credential zip archive(s) (or directory 402 # containing multiple such archives) 403 parser.add_argument( 404 'bundle', 405 metavar='unlock_creds.zip', 406 nargs='*', 407 help= 408 'Unlock using a zip bundle/archive of credentials (e.g. from Developer ' 409 'Console). You can optionally provide multiple archives and/or a ' 410 'directory of such bundles and the tool will automatically select the ' 411 'correct one to use based on matching the product ID against the device ' 412 'being unlocked.') 413 414 # Argument group #2 - Individual credential files 415 parser.add_argument( 416 '--pik_cert', 417 metavar='pik_cert.bin', 418 help='Path to product intermediate key (PIK) certificate file') 419 parser.add_argument( 420 '--puk_cert', 421 metavar='puk_cert.bin', 422 help='Path to product unlock key (PUK) certificate file') 423 parser.add_argument( 424 '--puk', 425 metavar='puk.pem', 426 help='Path to product unlock key in PEM format') 427 428 # Print help if no args given 429 args = parser.parse_args(in_args if in_args else ['-h']) 430 431 # Do the custom validation described above. 432 if args.pik_cert is not None or args.puk_cert is not None or args.puk is not None: 433 # Check mutual exclusion with bundle positional argument 434 if len(args.bundle): 435 parser.error( 436 'bundle argument is mutually exclusive with --pik_cert, --puk_cert, and --puk' 437 ) 438 439 # Check for 'mutual inclusion' of individual file options 440 if args.pik_cert is None: 441 parser.error("--pik_cert is required if --puk_cert or --puk' is given") 442 if args.puk_cert is None: 443 parser.error("--puk_cert is required if --pik_cert or --puk' is given") 444 if args.puk is None: 445 parser.error("--puk is required if --pik_cert or --puk_cert' is given") 446 elif not len(args.bundle): 447 parser.error( 448 'must provide either credentials bundle or individual credential files') 449 450 # Parse arguments into UnlockCredentials objects 451 if len(args.bundle): 452 creds = [] 453 for path in args.bundle: 454 if os.path.isfile(path): 455 creds.append(UnlockCredentials.from_credential_archive(path)) 456 elif os.path.isdir(path): 457 creds.extend( 458 FindUnlockCredentialsInDirectory(path, verbose=args.verbose)) 459 else: 460 parser.error("path argument '{}' does not exist".format(path)) 461 462 if len(creds) == 0: 463 parser.error('No unlock credentials were found in any of the given paths') 464 else: 465 creds = [UnlockCredentials(args.pik_cert, args.puk_cert, args.puk)] 466 467 ret = AuthenticatedUnlock(creds, serial=args.serial, verbose=args.verbose) 468 return 0 if ret else 1 469 470 471if __name__ == '__main__': 472 sys.exit(main(sys.argv[1:])) 473