• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7#
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Unit tests for at_auth_unlock."""
18
19import argparse
20import filecmp
21import os
22import shutil
23import subprocess
24import unittest
25
26from at_auth_unlock import *
27from Crypto.PublicKey import RSA
28from unittest.mock import patch
29
30
31def dataPath(file):
32  return os.path.join(os.path.dirname(__file__), 'data', file)
33
34
35DATA_FILE_PIK_CERTIFICATE = dataPath('atx_pik_certificate.bin')
36DATA_FILE_PUK_CERTIFICATE = dataPath('atx_puk_certificate.bin')
37DATA_FILE_PUK_KEY = dataPath('testkey_atx_puk.pem')
38DATA_FILE_UNLOCK_CHALLENGE = dataPath('atx_unlock_challenge.bin')
39DATA_FILE_UNLOCK_CREDENTIAL = dataPath('atx_unlock_credential.bin')
40
41
42def createTempZip(contents):
43  tempzip = tempfile.NamedTemporaryFile()
44  with zipfile.ZipFile(tempzip, 'w') as zip:
45    for arcname in contents:
46      zip.write(contents[arcname], arcname)
47  return tempzip
48
49
50def validUnlockCredsZip():
51  return createTempZip({
52      'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
53      'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
54      'puk_v1.pem': DATA_FILE_PUK_KEY
55  })
56
57
58class UnlockCredentialsTest(unittest.TestCase):
59
60  def testFromValidZipArchive(self):
61    with validUnlockCredsZip() as zip:
62      creds = UnlockCredentials.from_credential_archive(zip)
63      self.assertIsNotNone(creds.intermediate_cert)
64      self.assertIsNotNone(creds.unlock_cert)
65      self.assertIsNotNone(creds.unlock_key)
66
67  def testFromInvalidZipArchive(self):
68    with self.assertRaises(zipfile.BadZipfile):
69      UnlockCredentials.from_credential_archive(DATA_FILE_PUK_KEY)
70
71  def testFromArchiveMissingPikCertificate(self):
72    with createTempZip({
73        'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
74        'puk_v1.pem': DATA_FILE_PUK_KEY
75    }) as zip:
76      with self.assertRaises(ValueError):
77        UnlockCredentials.from_credential_archive(zip)
78
79  def testFromArchiveMissingPukCertificate(self):
80    with createTempZip({
81        'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
82        'puk_v1.pem': DATA_FILE_PUK_KEY
83    }) as zip:
84      with self.assertRaises(ValueError):
85        UnlockCredentials.from_credential_archive(zip)
86
87  def testFromArchiveMissingPuk(self):
88    with createTempZip({
89        'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
90        'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
91    }) as zip:
92      with self.assertRaises(ValueError):
93        UnlockCredentials.from_credential_archive(zip)
94
95  def testFromArchiveMultiplePikCertificates(self):
96    with createTempZip({
97        'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
98        'pik_certificate_v2.bin': DATA_FILE_PIK_CERTIFICATE,
99        'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
100        'puk_v1.pem': DATA_FILE_PUK_KEY
101    }) as zip:
102      with self.assertRaises(ValueError):
103        UnlockCredentials.from_credential_archive(zip)
104
105  def testFromArchiveMultiplePukCertificates(self):
106    with createTempZip({
107        'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
108        'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
109        'puk_certificate_v2.bin': DATA_FILE_PUK_CERTIFICATE,
110        'puk_v1.pem': DATA_FILE_PUK_KEY
111    }) as zip:
112      with self.assertRaises(ValueError):
113        UnlockCredentials.from_credential_archive(zip)
114
115  def testFromArchiveMultiplePuks(self):
116    with createTempZip({
117        'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
118        'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
119        'puk_v1.pem': DATA_FILE_PUK_KEY,
120        'puk_v2.pem': DATA_FILE_PUK_KEY
121    }) as zip:
122      with self.assertRaises(ValueError):
123        UnlockCredentials.from_credential_archive(zip)
124
125  def testFromFiles(self):
126    creds = UnlockCredentials(
127        intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE,
128        unlock_cert_file=DATA_FILE_PUK_CERTIFICATE,
129        unlock_key_file=DATA_FILE_PUK_KEY)
130    self.assertIsNotNone(creds.intermediate_cert)
131    self.assertIsNotNone(creds.unlock_cert)
132    self.assertIsNotNone(creds.unlock_key)
133
134  def testInvalidPuk(self):
135    with self.assertRaises(ValueError):
136      UnlockCredentials(
137          intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE,
138          unlock_cert_file=DATA_FILE_PUK_CERTIFICATE,
139          unlock_key_file=DATA_FILE_PUK_CERTIFICATE)
140
141  def testPukNotPrivateKey(self):
142    tempdir = tempfile.mkdtemp()
143    try:
144      with open(DATA_FILE_PUK_KEY, 'rb') as f:
145        key = RSA.importKey(f.read())
146      pubkey = os.path.join(tempdir, 'pubkey.pub')
147      with open(pubkey, 'wb') as f:
148        f.write(key.publickey().exportKey())
149      with self.assertRaises(ValueError):
150        UnlockCredentials(
151            intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE,
152            unlock_cert_file=DATA_FILE_PUK_CERTIFICATE,
153            unlock_key_file=pubkey)
154    finally:
155      shutil.rmtree(tempdir)
156
157  def testWrongSizeCerts(self):
158    pik_cert = DATA_FILE_PIK_CERTIFICATE
159    tempdir = tempfile.mkdtemp()
160    try:
161      # Copy a valid cert and truncate a single byte from the end to create a
162      # too-short cert.
163      shortfile = os.path.join(tempdir, 'shortfile.bin')
164      shutil.copy2(pik_cert, shortfile)
165      with open(shortfile, 'ab') as f:
166        f.seek(-1, os.SEEK_END)
167        f.truncate()
168      with self.assertRaises(ValueError):
169        creds = UnlockCredentials(
170            intermediate_cert_file=shortfile,
171            unlock_cert_file=DATA_FILE_PUK_CERTIFICATE,
172            unlock_key_file=DATA_FILE_PUK_KEY)
173      with self.assertRaises(ValueError):
174        creds = UnlockCredentials(
175            intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE,
176            unlock_cert_file=shortfile,
177            unlock_key_file=DATA_FILE_PUK_KEY)
178
179      # Copy a valid cert and append an arbitrary byte on the end to create a
180      # too-long cert.
181      longfile = os.path.join(tempdir, 'longfile.bin')
182      shutil.copy2(pik_cert, longfile)
183      with open(longfile, 'ab') as f:
184        f.write(b'\0')
185      with self.assertRaises(ValueError):
186        creds = UnlockCredentials(
187            intermediate_cert_file=longfile,
188            unlock_cert_file=DATA_FILE_PUK_CERTIFICATE,
189            unlock_key_file=DATA_FILE_PUK_KEY)
190      with self.assertRaises(ValueError):
191        creds = UnlockCredentials(
192            intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE,
193            unlock_cert_file=longfile,
194            unlock_key_file=DATA_FILE_PUK_KEY)
195    finally:
196      shutil.rmtree(tempdir)
197
198
199def writeFullUnlockChallenge(out_file, product_id_hash=None):
200  """Helper function to create a file with a full AvbAtxUnlockChallenge struct.
201
202  Arguments:
203    product_id_hash: [optional] 32 byte value to include in the challenge as the
204      SHA256 hash of the product ID. If not provided, will default to the
205      product ID hash from the subject of DATA_FILE_PUK_CERTIFICATE.
206  """
207  if product_id_hash is None:
208    with open(DATA_FILE_PUK_CERTIFICATE, 'rb') as f:
209      product_id_hash = GetAtxCertificateSubject(f.read())
210  assert len(product_id_hash) == 32
211
212  with open(out_file, 'wb') as out:
213    out.write(struct.pack('<I', 1))
214    out.write(product_id_hash)
215    with open(DATA_FILE_UNLOCK_CHALLENGE, 'rb') as f:
216      out.write(f.read())
217
218
219class MakeAtxUnlockCredentialTest(unittest.TestCase):
220
221  def testCredentialIsCorrect(self):
222    with validUnlockCredsZip() as zip:
223      creds = UnlockCredentials.from_credential_archive(zip)
224
225      tempdir = tempfile.mkdtemp()
226      try:
227        challenge_file = os.path.join(tempdir, 'challenge')
228        writeFullUnlockChallenge(challenge_file)
229        challenge = UnlockChallenge(challenge_file)
230        out_cred = os.path.join(tempdir, 'credential')
231
232        # Compare unlock credential generated by function with one generated
233        # using 'avbtool make_atx_unlock_credential', to check correctness.
234        MakeAtxUnlockCredential(creds, challenge, out_cred)
235        self.assertTrue(filecmp.cmp(out_cred, DATA_FILE_UNLOCK_CREDENTIAL))
236      finally:
237        shutil.rmtree(tempdir)
238
239  def testWrongChallengeSize(self):
240    with validUnlockCredsZip() as zip:
241      creds = UnlockCredentials.from_credential_archive(zip)
242
243      tempdir = tempfile.mkdtemp()
244      try:
245        out_cred = os.path.join(tempdir, 'credential')
246
247        # The bundled unlock challenge is just the 16 byte challenge, not the
248        # full AvbAtxUnlockChallenge like this expects.
249        with self.assertRaises(ValueError):
250          challenge = UnlockChallenge(DATA_FILE_UNLOCK_CHALLENGE)
251          MakeAtxUnlockCredential(creds, challenge, out_cred)
252      finally:
253        shutil.rmtree(tempdir)
254
255
256def makeFastbootCommandFake(testcase,
257                            expect_serial=None,
258                            error_on_command_number=None,
259                            product_id_hash=None,
260                            stay_locked=False):
261  """Construct a fake fastboot command handler, to be used with unitttest.mock.Mock.side_effect.
262
263  This can be used to create a callable that acts as a fake for a real device
264  responding to the fastboot commands involved in an authenticated unlock. The
265  returned callback is intended to be used with unittest.mock.Mock.side_effect.
266  There are a number of optional arguments here that can be used to customize
267  the behavior of the fake for a specific test.
268
269  Arguments:
270    testcase: unittest.TestCase object for the associated test
271    expect_serial: [optional] Expect (and assert) that the fastboot command
272      specifies a specific device serial to communicate with.
273    error_on_command_number: [optional] Return a fastboot error (non-zero exit
274      code) on the nth (0-based) command handled.
275    stay_locked: [optional] Make the fake report that the device is still locked
276      after an otherwise successful unlock attempt.
277  """
278
279  def handler(args, *extraArgs, **kwargs):
280    if error_on_command_number is not None:
281      handler.command_counter += 1
282      if handler.command_counter - 1 == error_on_command_number:
283        raise subprocess.CalledProcessError(
284            returncode=1, cmd=args, output=b'Fake: ERROR')
285
286    testcase.assertEqual(args.pop(0), 'fastboot')
287    if expect_serial is not None:
288      # This is a bit fragile in that, in reality, fastboot allows '-s SERIAL'
289      # to not just be the first arguments, but it works for this use case.
290      testcase.assertEqual(args.pop(0), '-s')
291      testcase.assertEqual(args.pop(0), expect_serial)
292
293    if args[0:2] == ['oem', 'at-get-vboot-unlock-challenge']:
294      handler.challenge_staged = True
295    elif args[0] == 'get_staged':
296      if not handler.challenge_staged:
297        raise subprocess.CalledProcessError(
298            returncode=1, cmd=args, output=b'Fake: No data staged')
299
300      writeFullUnlockChallenge(args[1], product_id_hash=product_id_hash)
301      handler.challenge_staged = False
302    elif args[0] == 'stage':
303      handler.staged_file = args[1]
304    elif args[0:2] == ['oem', 'at-unlock-vboot']:
305      if handler.staged_file is None:
306        raise subprocess.CalledProcessError(
307            returncode=1, cmd=args, output=b'Fake: No unlock credential staged')
308
309      # Validate the unlock credential as if this were a test key locked device,
310      # which implies tests that want a successful unlock need to be set up to
311      # use DATA_FILE_PUK_KEY to sign the challenge. Credentials generated using
312      # other keys will be properly rejected.
313      if not filecmp.cmp(handler.staged_file, DATA_FILE_UNLOCK_CREDENTIAL):
314        raise subprocess.CalledProcessError(
315            returncode=1, cmd=args, output=b'Fake: Incorrect unlock credential')
316
317      handler.locked = True if stay_locked else False
318    elif args[0:2] == ['getvar', 'at-vboot-state']:
319      return b'avb-locked: ' + (b'1' if handler.locked else b'0')
320    return b'Fake: OK'
321
322  handler.command_counter = 0
323  handler.challenge_staged = False
324  handler.staged_file = None
325  handler.locked = True
326  return handler
327
328
329class AuthenticatedUnlockTest(unittest.TestCase):
330
331  @patch('subprocess.check_output')
332  def testUnlockWithZipArchive(self, mock_subp_check_output):
333    with validUnlockCredsZip() as zip:
334      mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
335      self.assertEqual(main([zip.name]), 0)
336      self.assertNotEqual(mock_subp_check_output.call_count, 0)
337
338  @patch('subprocess.check_output')
339  def testUnlockDeviceBySerial(self, mock_subp_check_output):
340    with validUnlockCredsZip() as zip:
341      SERIAL = 'abcde12345'
342      mock_subp_check_output.side_effect = makeFastbootCommandFake(
343          self, expect_serial=SERIAL)
344      self.assertEqual(main([zip.name, '-s', SERIAL]), 0)
345      self.assertNotEqual(mock_subp_check_output.call_count, 0)
346
347  @patch('subprocess.check_output')
348  def testUnlockWithIndividualFiles(self, mock_subp_check_output):
349    mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
350    self.assertEqual(
351        main([
352            '--pik_cert', DATA_FILE_PIK_CERTIFICATE, '--puk_cert',
353            DATA_FILE_PUK_CERTIFICATE, '--puk', DATA_FILE_PUK_KEY
354        ]), 0)
355    self.assertNotEqual(mock_subp_check_output.call_count, 0)
356
357  @patch('subprocess.check_output')
358  def testFastbootError(self, mock_subp_check_output):
359    """Verify that errors are handled properly if fastboot commands error out."""
360    with validUnlockCredsZip() as zip:
361      for n in range(5):
362        mock_subp_check_output.reset_mock()
363        mock_subp_check_output.side_effect = makeFastbootCommandFake(
364            self, error_on_command_number=n)
365        self.assertNotEqual(main([zip.name]), 0)
366        self.assertNotEqual(mock_subp_check_output.call_count, 0)
367
368  @patch('subprocess.check_output')
369  def testDoesntActuallyUnlock(self, mock_subp_check_output):
370    """Verify fails if fake set to not actually unlock."""
371    with validUnlockCredsZip() as zip:
372      mock_subp_check_output.side_effect = makeFastbootCommandFake(
373          self, stay_locked=True)
374      self.assertNotEqual(main([zip.name]), 0)
375      self.assertNotEqual(mock_subp_check_output.call_count, 0)
376
377  @patch('subprocess.check_output')
378  def testNoCredentialsMatchDeviceProductID(self, mock_subp_check_output):
379    """Test two cases where fake responds with a challenge that has a product ID hash which doesn't match the credentials used."""
380    # Case 1: Change the product ID hash that the fake responds with.
381    with validUnlockCredsZip() as zip:
382      mock_subp_check_output.side_effect = makeFastbootCommandFake(
383          self, product_id_hash=b'\x00' * 32)
384      self.assertNotEqual(main([zip.name]), 0)
385      self.assertNotEqual(mock_subp_check_output.call_count, 0)
386
387    # Case 2: Use credentials with a different product ID.
388    with createTempZip({
389        'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
390        # Note: PIK cert used as PUK cert so subject (i.e. product ID hash) is
391        # different
392        'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
393        'puk_v1.pem': DATA_FILE_PUK_KEY
394    }) as zip:
395      mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
396      self.assertNotEqual(main([zip.name]), 0)
397      self.assertNotEqual(mock_subp_check_output.call_count, 0)
398
399  @patch('subprocess.check_output')
400  def testMatchingCredentialSelectedFromZipArchives(self,
401                                                    mock_subp_check_output):
402    """Test correct credential based on product ID hash used if multiple provided directly through arguments."""
403    with validUnlockCredsZip() as correctCreds, createTempZip({
404        'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
405        # Note: PIK cert used as PUK cert so subject (i.e. product ID hash)
406        # doesn't match
407        'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
408        'puk_v1.pem': DATA_FILE_PUK_KEY
409    }) as wrongCreds:
410      mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
411      self.assertEqual(main([wrongCreds.name, correctCreds.name]), 0)
412      self.assertNotEqual(mock_subp_check_output.call_count, 0)
413
414  @patch('subprocess.check_output')
415  def testMatchingCredentialSelectedFromDirectory(self, mock_subp_check_output):
416    """Test correct credential based on product ID hash used if multiple provided indirectly through a directory argument."""
417    with validUnlockCredsZip() as correctCreds, createTempZip({
418        'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
419        # Note: PIK cert used as PUK cert so subject (i.e. product ID hash)
420        # doesn't match
421        'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
422        'puk_v1.pem': DATA_FILE_PUK_KEY
423    }) as wrongCreds:
424      tempdir = tempfile.mkdtemp()
425      try:
426        shutil.copy2(correctCreds.name, tempdir)
427        shutil.copy2(wrongCreds.name, tempdir)
428
429        mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
430        self.assertEqual(main([tempdir]), 0)
431        self.assertNotEqual(mock_subp_check_output.call_count, 0)
432      finally:
433        shutil.rmtree(tempdir)
434
435  @patch('subprocess.check_output')
436  def testMatchingCredentialSelectedFromEither(self, mock_subp_check_output):
437    """Test correct credential based on product ID hash used if arguments give some combination of file and directory arguments."""
438    with validUnlockCredsZip() as correctCreds, createTempZip({
439        'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
440        # Note: PIK cert used as PUK cert so subject (i.e. product ID hash)
441        # doesn't match
442        'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
443        'puk_v1.pem': DATA_FILE_PUK_KEY
444    }) as wrongCreds:
445      # Case 1: Correct creds in directory, wrong in file arg
446      tempdir = tempfile.mkdtemp()
447      try:
448        shutil.copy2(correctCreds.name, tempdir)
449
450        mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
451        self.assertEqual(main([wrongCreds.name, tempdir]), 0)
452        self.assertNotEqual(mock_subp_check_output.call_count, 0)
453
454      finally:
455        shutil.rmtree(tempdir)
456
457      # Case 2: Correct creds in file arg, wrong in directory
458      tempdir = tempfile.mkdtemp()
459      try:
460        shutil.copy2(wrongCreds.name, tempdir)
461
462        mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
463        self.assertEqual(main([tempdir, correctCreds.name]), 0)
464        self.assertNotEqual(mock_subp_check_output.call_count, 0)
465
466        # Case 2: Correct creds in file arg, wrong in directory
467      finally:
468        shutil.rmtree(tempdir)
469
470  @patch('argparse.ArgumentParser.error')
471  def testArgparseDirectoryWithNoCredentials(self, mock_parser_error):
472    """Test """
473    tempdir = tempfile.mkdtemp()
474    try:
475      # Make sure random files are ignored.
476      with open(os.path.join(tempdir, 'so_random'), 'w') as f:
477        f.write("I'm a random file")
478
479      mock_parser_error.side_effect = ValueError('ArgumentParser.error')
480      with self.assertRaises(ValueError):
481        main([tempdir])
482      self.assertEqual(mock_parser_error.call_count, 1)
483    finally:
484      shutil.rmtree(tempdir)
485
486  @patch('argparse.ArgumentParser.error')
487  def testArgparseMutualExclusionArchiveAndFiles(self, mock_parser_error):
488    mock_parser_error.side_effect = ValueError('ArgumentParser.error')
489    with self.assertRaises(ValueError):
490      main(['dummy.zip', '--pik_cert', DATA_FILE_PIK_CERTIFICATE])
491    self.assertEqual(mock_parser_error.call_count, 1)
492
493  @patch('argparse.ArgumentParser.error')
494  def testArgparseMutualInclusionOfFileArgs(self, mock_parser_error):
495    mock_parser_error.side_effect = ValueError('ArgumentParser.error')
496    with self.assertRaises(ValueError):
497      main(['--pik_cert', 'pik_cert.bin', '--puk_cert', 'puk_cert.bin'])
498    self.assertEqual(mock_parser_error.call_count, 1)
499
500    mock_parser_error.reset_mock()
501    with self.assertRaises(ValueError):
502      main(['--pik_cert', 'pik_cert.bin', '--puk', 'puk.pem'])
503    self.assertEqual(mock_parser_error.call_count, 1)
504
505    mock_parser_error.reset_mock()
506    with self.assertRaises(ValueError):
507      main(['--puk_cert', 'puk_cert.bin', '--puk', 'puk.pem'])
508    self.assertEqual(mock_parser_error.call_count, 1)
509
510  @patch('argparse.ArgumentParser.error')
511  def testArgparseMissingBundleAndFiles(self, mock_parser_error):
512    mock_parser_error.side_effect = ValueError('ArgumentParser.error')
513    with self.assertRaises(ValueError):
514      main(['-s', '1234abcd'])
515    self.assertEqual(mock_parser_error.call_count, 1)
516
517
518if __name__ == '__main__':
519  unittest.main(verbosity=3)
520