1#!/usr/bin/env python 2# 3# Copyright 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17"""Unit tests for at_auth_unlock.""" 18 19import argparse 20import filecmp 21import os 22import shutil 23import subprocess 24import unittest 25 26from at_auth_unlock import * 27from Crypto.PublicKey import RSA 28from unittest.mock import patch 29 30 31def dataPath(file): 32 return os.path.join(os.path.dirname(__file__), 'data', file) 33 34 35DATA_FILE_PIK_CERTIFICATE = dataPath('atx_pik_certificate.bin') 36DATA_FILE_PUK_CERTIFICATE = dataPath('atx_puk_certificate.bin') 37DATA_FILE_PUK_KEY = dataPath('testkey_atx_puk.pem') 38DATA_FILE_UNLOCK_CHALLENGE = dataPath('atx_unlock_challenge.bin') 39DATA_FILE_UNLOCK_CREDENTIAL = dataPath('atx_unlock_credential.bin') 40 41 42def createTempZip(contents): 43 tempzip = tempfile.NamedTemporaryFile() 44 with zipfile.ZipFile(tempzip, 'w') as zip: 45 for arcname in contents: 46 zip.write(contents[arcname], arcname) 47 return tempzip 48 49 50def validUnlockCredsZip(): 51 return createTempZip({ 52 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 53 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, 54 'puk_v1.pem': DATA_FILE_PUK_KEY 55 }) 56 57 58class UnlockCredentialsTest(unittest.TestCase): 59 60 def testFromValidZipArchive(self): 61 with validUnlockCredsZip() as zip: 62 creds = UnlockCredentials.from_credential_archive(zip) 63 self.assertIsNotNone(creds.intermediate_cert) 64 self.assertIsNotNone(creds.unlock_cert) 65 self.assertIsNotNone(creds.unlock_key) 66 67 def testFromInvalidZipArchive(self): 68 with self.assertRaises(zipfile.BadZipfile): 69 UnlockCredentials.from_credential_archive(DATA_FILE_PUK_KEY) 70 71 def testFromArchiveMissingPikCertificate(self): 72 with createTempZip({ 73 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, 74 'puk_v1.pem': DATA_FILE_PUK_KEY 75 }) as zip: 76 with self.assertRaises(ValueError): 77 UnlockCredentials.from_credential_archive(zip) 78 79 def testFromArchiveMissingPukCertificate(self): 80 with createTempZip({ 81 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 82 'puk_v1.pem': DATA_FILE_PUK_KEY 83 }) as zip: 84 with self.assertRaises(ValueError): 85 UnlockCredentials.from_credential_archive(zip) 86 87 def testFromArchiveMissingPuk(self): 88 with createTempZip({ 89 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 90 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, 91 }) as zip: 92 with self.assertRaises(ValueError): 93 UnlockCredentials.from_credential_archive(zip) 94 95 def testFromArchiveMultiplePikCertificates(self): 96 with createTempZip({ 97 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 98 'pik_certificate_v2.bin': DATA_FILE_PIK_CERTIFICATE, 99 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, 100 'puk_v1.pem': DATA_FILE_PUK_KEY 101 }) as zip: 102 with self.assertRaises(ValueError): 103 UnlockCredentials.from_credential_archive(zip) 104 105 def testFromArchiveMultiplePukCertificates(self): 106 with createTempZip({ 107 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 108 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, 109 'puk_certificate_v2.bin': DATA_FILE_PUK_CERTIFICATE, 110 'puk_v1.pem': DATA_FILE_PUK_KEY 111 }) as zip: 112 with self.assertRaises(ValueError): 113 UnlockCredentials.from_credential_archive(zip) 114 115 def testFromArchiveMultiplePuks(self): 116 with createTempZip({ 117 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 118 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, 119 'puk_v1.pem': DATA_FILE_PUK_KEY, 120 'puk_v2.pem': DATA_FILE_PUK_KEY 121 }) as zip: 122 with self.assertRaises(ValueError): 123 UnlockCredentials.from_credential_archive(zip) 124 125 def testFromFiles(self): 126 creds = UnlockCredentials( 127 intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE, 128 unlock_cert_file=DATA_FILE_PUK_CERTIFICATE, 129 unlock_key_file=DATA_FILE_PUK_KEY) 130 self.assertIsNotNone(creds.intermediate_cert) 131 self.assertIsNotNone(creds.unlock_cert) 132 self.assertIsNotNone(creds.unlock_key) 133 134 def testInvalidPuk(self): 135 with self.assertRaises(ValueError): 136 UnlockCredentials( 137 intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE, 138 unlock_cert_file=DATA_FILE_PUK_CERTIFICATE, 139 unlock_key_file=DATA_FILE_PUK_CERTIFICATE) 140 141 def testPukNotPrivateKey(self): 142 tempdir = tempfile.mkdtemp() 143 try: 144 with open(DATA_FILE_PUK_KEY, 'rb') as f: 145 key = RSA.importKey(f.read()) 146 pubkey = os.path.join(tempdir, 'pubkey.pub') 147 with open(pubkey, 'wb') as f: 148 f.write(key.publickey().exportKey()) 149 with self.assertRaises(ValueError): 150 UnlockCredentials( 151 intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE, 152 unlock_cert_file=DATA_FILE_PUK_CERTIFICATE, 153 unlock_key_file=pubkey) 154 finally: 155 shutil.rmtree(tempdir) 156 157 def testWrongSizeCerts(self): 158 pik_cert = DATA_FILE_PIK_CERTIFICATE 159 tempdir = tempfile.mkdtemp() 160 try: 161 # Copy a valid cert and truncate a single byte from the end to create a 162 # too-short cert. 163 shortfile = os.path.join(tempdir, 'shortfile.bin') 164 shutil.copy2(pik_cert, shortfile) 165 with open(shortfile, 'ab') as f: 166 f.seek(-1, os.SEEK_END) 167 f.truncate() 168 with self.assertRaises(ValueError): 169 creds = UnlockCredentials( 170 intermediate_cert_file=shortfile, 171 unlock_cert_file=DATA_FILE_PUK_CERTIFICATE, 172 unlock_key_file=DATA_FILE_PUK_KEY) 173 with self.assertRaises(ValueError): 174 creds = UnlockCredentials( 175 intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE, 176 unlock_cert_file=shortfile, 177 unlock_key_file=DATA_FILE_PUK_KEY) 178 179 # Copy a valid cert and append an arbitrary byte on the end to create a 180 # too-long cert. 181 longfile = os.path.join(tempdir, 'longfile.bin') 182 shutil.copy2(pik_cert, longfile) 183 with open(longfile, 'ab') as f: 184 f.write(b'\0') 185 with self.assertRaises(ValueError): 186 creds = UnlockCredentials( 187 intermediate_cert_file=longfile, 188 unlock_cert_file=DATA_FILE_PUK_CERTIFICATE, 189 unlock_key_file=DATA_FILE_PUK_KEY) 190 with self.assertRaises(ValueError): 191 creds = UnlockCredentials( 192 intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE, 193 unlock_cert_file=longfile, 194 unlock_key_file=DATA_FILE_PUK_KEY) 195 finally: 196 shutil.rmtree(tempdir) 197 198 199def writeFullUnlockChallenge(out_file, product_id_hash=None): 200 """Helper function to create a file with a full AvbAtxUnlockChallenge struct. 201 202 Arguments: 203 product_id_hash: [optional] 32 byte value to include in the challenge as the 204 SHA256 hash of the product ID. If not provided, will default to the 205 product ID hash from the subject of DATA_FILE_PUK_CERTIFICATE. 206 """ 207 if product_id_hash is None: 208 with open(DATA_FILE_PUK_CERTIFICATE, 'rb') as f: 209 product_id_hash = GetAtxCertificateSubject(f.read()) 210 assert len(product_id_hash) == 32 211 212 with open(out_file, 'wb') as out: 213 out.write(struct.pack('<I', 1)) 214 out.write(product_id_hash) 215 with open(DATA_FILE_UNLOCK_CHALLENGE, 'rb') as f: 216 out.write(f.read()) 217 218 219class MakeAtxUnlockCredentialTest(unittest.TestCase): 220 221 def testCredentialIsCorrect(self): 222 with validUnlockCredsZip() as zip: 223 creds = UnlockCredentials.from_credential_archive(zip) 224 225 tempdir = tempfile.mkdtemp() 226 try: 227 challenge_file = os.path.join(tempdir, 'challenge') 228 writeFullUnlockChallenge(challenge_file) 229 challenge = UnlockChallenge(challenge_file) 230 out_cred = os.path.join(tempdir, 'credential') 231 232 # Compare unlock credential generated by function with one generated 233 # using 'avbtool make_atx_unlock_credential', to check correctness. 234 MakeAtxUnlockCredential(creds, challenge, out_cred) 235 self.assertTrue(filecmp.cmp(out_cred, DATA_FILE_UNLOCK_CREDENTIAL)) 236 finally: 237 shutil.rmtree(tempdir) 238 239 def testWrongChallengeSize(self): 240 with validUnlockCredsZip() as zip: 241 creds = UnlockCredentials.from_credential_archive(zip) 242 243 tempdir = tempfile.mkdtemp() 244 try: 245 out_cred = os.path.join(tempdir, 'credential') 246 247 # The bundled unlock challenge is just the 16 byte challenge, not the 248 # full AvbAtxUnlockChallenge like this expects. 249 with self.assertRaises(ValueError): 250 challenge = UnlockChallenge(DATA_FILE_UNLOCK_CHALLENGE) 251 MakeAtxUnlockCredential(creds, challenge, out_cred) 252 finally: 253 shutil.rmtree(tempdir) 254 255 256def makeFastbootCommandFake(testcase, 257 expect_serial=None, 258 error_on_command_number=None, 259 product_id_hash=None, 260 stay_locked=False): 261 """Construct a fake fastboot command handler, to be used with unitttest.mock.Mock.side_effect. 262 263 This can be used to create a callable that acts as a fake for a real device 264 responding to the fastboot commands involved in an authenticated unlock. The 265 returned callback is intended to be used with unittest.mock.Mock.side_effect. 266 There are a number of optional arguments here that can be used to customize 267 the behavior of the fake for a specific test. 268 269 Arguments: 270 testcase: unittest.TestCase object for the associated test 271 expect_serial: [optional] Expect (and assert) that the fastboot command 272 specifies a specific device serial to communicate with. 273 error_on_command_number: [optional] Return a fastboot error (non-zero exit 274 code) on the nth (0-based) command handled. 275 stay_locked: [optional] Make the fake report that the device is still locked 276 after an otherwise successful unlock attempt. 277 """ 278 279 def handler(args, *extraArgs, **kwargs): 280 if error_on_command_number is not None: 281 handler.command_counter += 1 282 if handler.command_counter - 1 == error_on_command_number: 283 raise subprocess.CalledProcessError( 284 returncode=1, cmd=args, output=b'Fake: ERROR') 285 286 testcase.assertEqual(args.pop(0), 'fastboot') 287 if expect_serial is not None: 288 # This is a bit fragile in that, in reality, fastboot allows '-s SERIAL' 289 # to not just be the first arguments, but it works for this use case. 290 testcase.assertEqual(args.pop(0), '-s') 291 testcase.assertEqual(args.pop(0), expect_serial) 292 293 if args[0:2] == ['oem', 'at-get-vboot-unlock-challenge']: 294 handler.challenge_staged = True 295 elif args[0] == 'get_staged': 296 if not handler.challenge_staged: 297 raise subprocess.CalledProcessError( 298 returncode=1, cmd=args, output=b'Fake: No data staged') 299 300 writeFullUnlockChallenge(args[1], product_id_hash=product_id_hash) 301 handler.challenge_staged = False 302 elif args[0] == 'stage': 303 handler.staged_file = args[1] 304 elif args[0:2] == ['oem', 'at-unlock-vboot']: 305 if handler.staged_file is None: 306 raise subprocess.CalledProcessError( 307 returncode=1, cmd=args, output=b'Fake: No unlock credential staged') 308 309 # Validate the unlock credential as if this were a test key locked device, 310 # which implies tests that want a successful unlock need to be set up to 311 # use DATA_FILE_PUK_KEY to sign the challenge. Credentials generated using 312 # other keys will be properly rejected. 313 if not filecmp.cmp(handler.staged_file, DATA_FILE_UNLOCK_CREDENTIAL): 314 raise subprocess.CalledProcessError( 315 returncode=1, cmd=args, output=b'Fake: Incorrect unlock credential') 316 317 handler.locked = True if stay_locked else False 318 elif args[0:2] == ['getvar', 'at-vboot-state']: 319 return b'avb-locked: ' + (b'1' if handler.locked else b'0') 320 return b'Fake: OK' 321 322 handler.command_counter = 0 323 handler.challenge_staged = False 324 handler.staged_file = None 325 handler.locked = True 326 return handler 327 328 329class AuthenticatedUnlockTest(unittest.TestCase): 330 331 @patch('subprocess.check_output') 332 def testUnlockWithZipArchive(self, mock_subp_check_output): 333 with validUnlockCredsZip() as zip: 334 mock_subp_check_output.side_effect = makeFastbootCommandFake(self) 335 self.assertEqual(main([zip.name]), 0) 336 self.assertNotEqual(mock_subp_check_output.call_count, 0) 337 338 @patch('subprocess.check_output') 339 def testUnlockDeviceBySerial(self, mock_subp_check_output): 340 with validUnlockCredsZip() as zip: 341 SERIAL = 'abcde12345' 342 mock_subp_check_output.side_effect = makeFastbootCommandFake( 343 self, expect_serial=SERIAL) 344 self.assertEqual(main([zip.name, '-s', SERIAL]), 0) 345 self.assertNotEqual(mock_subp_check_output.call_count, 0) 346 347 @patch('subprocess.check_output') 348 def testUnlockWithIndividualFiles(self, mock_subp_check_output): 349 mock_subp_check_output.side_effect = makeFastbootCommandFake(self) 350 self.assertEqual( 351 main([ 352 '--pik_cert', DATA_FILE_PIK_CERTIFICATE, '--puk_cert', 353 DATA_FILE_PUK_CERTIFICATE, '--puk', DATA_FILE_PUK_KEY 354 ]), 0) 355 self.assertNotEqual(mock_subp_check_output.call_count, 0) 356 357 @patch('subprocess.check_output') 358 def testFastbootError(self, mock_subp_check_output): 359 """Verify that errors are handled properly if fastboot commands error out.""" 360 with validUnlockCredsZip() as zip: 361 for n in range(5): 362 mock_subp_check_output.reset_mock() 363 mock_subp_check_output.side_effect = makeFastbootCommandFake( 364 self, error_on_command_number=n) 365 self.assertNotEqual(main([zip.name]), 0) 366 self.assertNotEqual(mock_subp_check_output.call_count, 0) 367 368 @patch('subprocess.check_output') 369 def testDoesntActuallyUnlock(self, mock_subp_check_output): 370 """Verify fails if fake set to not actually unlock.""" 371 with validUnlockCredsZip() as zip: 372 mock_subp_check_output.side_effect = makeFastbootCommandFake( 373 self, stay_locked=True) 374 self.assertNotEqual(main([zip.name]), 0) 375 self.assertNotEqual(mock_subp_check_output.call_count, 0) 376 377 @patch('subprocess.check_output') 378 def testNoCredentialsMatchDeviceProductID(self, mock_subp_check_output): 379 """Test two cases where fake responds with a challenge that has a product ID hash which doesn't match the credentials used.""" 380 # Case 1: Change the product ID hash that the fake responds with. 381 with validUnlockCredsZip() as zip: 382 mock_subp_check_output.side_effect = makeFastbootCommandFake( 383 self, product_id_hash=b'\x00' * 32) 384 self.assertNotEqual(main([zip.name]), 0) 385 self.assertNotEqual(mock_subp_check_output.call_count, 0) 386 387 # Case 2: Use credentials with a different product ID. 388 with createTempZip({ 389 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 390 # Note: PIK cert used as PUK cert so subject (i.e. product ID hash) is 391 # different 392 'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 393 'puk_v1.pem': DATA_FILE_PUK_KEY 394 }) as zip: 395 mock_subp_check_output.side_effect = makeFastbootCommandFake(self) 396 self.assertNotEqual(main([zip.name]), 0) 397 self.assertNotEqual(mock_subp_check_output.call_count, 0) 398 399 @patch('subprocess.check_output') 400 def testMatchingCredentialSelectedFromZipArchives(self, 401 mock_subp_check_output): 402 """Test correct credential based on product ID hash used if multiple provided directly through arguments.""" 403 with validUnlockCredsZip() as correctCreds, createTempZip({ 404 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 405 # Note: PIK cert used as PUK cert so subject (i.e. product ID hash) 406 # doesn't match 407 'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 408 'puk_v1.pem': DATA_FILE_PUK_KEY 409 }) as wrongCreds: 410 mock_subp_check_output.side_effect = makeFastbootCommandFake(self) 411 self.assertEqual(main([wrongCreds.name, correctCreds.name]), 0) 412 self.assertNotEqual(mock_subp_check_output.call_count, 0) 413 414 @patch('subprocess.check_output') 415 def testMatchingCredentialSelectedFromDirectory(self, mock_subp_check_output): 416 """Test correct credential based on product ID hash used if multiple provided indirectly through a directory argument.""" 417 with validUnlockCredsZip() as correctCreds, createTempZip({ 418 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 419 # Note: PIK cert used as PUK cert so subject (i.e. product ID hash) 420 # doesn't match 421 'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 422 'puk_v1.pem': DATA_FILE_PUK_KEY 423 }) as wrongCreds: 424 tempdir = tempfile.mkdtemp() 425 try: 426 shutil.copy2(correctCreds.name, tempdir) 427 shutil.copy2(wrongCreds.name, tempdir) 428 429 mock_subp_check_output.side_effect = makeFastbootCommandFake(self) 430 self.assertEqual(main([tempdir]), 0) 431 self.assertNotEqual(mock_subp_check_output.call_count, 0) 432 finally: 433 shutil.rmtree(tempdir) 434 435 @patch('subprocess.check_output') 436 def testMatchingCredentialSelectedFromEither(self, mock_subp_check_output): 437 """Test correct credential based on product ID hash used if arguments give some combination of file and directory arguments.""" 438 with validUnlockCredsZip() as correctCreds, createTempZip({ 439 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 440 # Note: PIK cert used as PUK cert so subject (i.e. product ID hash) 441 # doesn't match 442 'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 443 'puk_v1.pem': DATA_FILE_PUK_KEY 444 }) as wrongCreds: 445 # Case 1: Correct creds in directory, wrong in file arg 446 tempdir = tempfile.mkdtemp() 447 try: 448 shutil.copy2(correctCreds.name, tempdir) 449 450 mock_subp_check_output.side_effect = makeFastbootCommandFake(self) 451 self.assertEqual(main([wrongCreds.name, tempdir]), 0) 452 self.assertNotEqual(mock_subp_check_output.call_count, 0) 453 454 finally: 455 shutil.rmtree(tempdir) 456 457 # Case 2: Correct creds in file arg, wrong in directory 458 tempdir = tempfile.mkdtemp() 459 try: 460 shutil.copy2(wrongCreds.name, tempdir) 461 462 mock_subp_check_output.side_effect = makeFastbootCommandFake(self) 463 self.assertEqual(main([tempdir, correctCreds.name]), 0) 464 self.assertNotEqual(mock_subp_check_output.call_count, 0) 465 466 # Case 2: Correct creds in file arg, wrong in directory 467 finally: 468 shutil.rmtree(tempdir) 469 470 @patch('argparse.ArgumentParser.error') 471 def testArgparseDirectoryWithNoCredentials(self, mock_parser_error): 472 """Test """ 473 tempdir = tempfile.mkdtemp() 474 try: 475 # Make sure random files are ignored. 476 with open(os.path.join(tempdir, 'so_random'), 'w') as f: 477 f.write("I'm a random file") 478 479 mock_parser_error.side_effect = ValueError('ArgumentParser.error') 480 with self.assertRaises(ValueError): 481 main([tempdir]) 482 self.assertEqual(mock_parser_error.call_count, 1) 483 finally: 484 shutil.rmtree(tempdir) 485 486 @patch('argparse.ArgumentParser.error') 487 def testArgparseMutualExclusionArchiveAndFiles(self, mock_parser_error): 488 mock_parser_error.side_effect = ValueError('ArgumentParser.error') 489 with self.assertRaises(ValueError): 490 main(['dummy.zip', '--pik_cert', DATA_FILE_PIK_CERTIFICATE]) 491 self.assertEqual(mock_parser_error.call_count, 1) 492 493 @patch('argparse.ArgumentParser.error') 494 def testArgparseMutualInclusionOfFileArgs(self, mock_parser_error): 495 mock_parser_error.side_effect = ValueError('ArgumentParser.error') 496 with self.assertRaises(ValueError): 497 main(['--pik_cert', 'pik_cert.bin', '--puk_cert', 'puk_cert.bin']) 498 self.assertEqual(mock_parser_error.call_count, 1) 499 500 mock_parser_error.reset_mock() 501 with self.assertRaises(ValueError): 502 main(['--pik_cert', 'pik_cert.bin', '--puk', 'puk.pem']) 503 self.assertEqual(mock_parser_error.call_count, 1) 504 505 mock_parser_error.reset_mock() 506 with self.assertRaises(ValueError): 507 main(['--puk_cert', 'puk_cert.bin', '--puk', 'puk.pem']) 508 self.assertEqual(mock_parser_error.call_count, 1) 509 510 @patch('argparse.ArgumentParser.error') 511 def testArgparseMissingBundleAndFiles(self, mock_parser_error): 512 mock_parser_error.side_effect = ValueError('ArgumentParser.error') 513 with self.assertRaises(ValueError): 514 main(['-s', '1234abcd']) 515 self.assertEqual(mock_parser_error.call_count, 1) 516 517 518if __name__ == '__main__': 519 unittest.main(verbosity=3) 520