# Copyright 2017 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Unit test for atft manager.""" import base64 import unittest import atftman from atftman import EncryptionAlgorithm from atftman import ProductInfo from atftman import ProvisionState from atftman import ProvisionStatus from fastboot_exceptions import DeviceNotFoundException from fastboot_exceptions import FastbootFailure from fastboot_exceptions import NoAlgorithmAvailableException from fastboot_exceptions import ProductAttributesFileFormatError from fastboot_exceptions import ProductNotSpecifiedException from mock import call from mock import MagicMock from mock import patch files = [] class AtftManTest(unittest.TestCase): ATFA_TEST_SERIAL = 'ATFA_TEST_SERIAL' TEST_TMP_FOLDER = '/tmp/TMPTEST/' TEST_SERIAL = 'TEST_SERIAL' TEST_SERIAL2 = 'TEST_SERIAL2' TEST_SERIAL3 = 'TEST_SERIAL3' TEST_UUID = 'TEST-UUID' TEST_LOCATION = 'BUS1-PORT1' TEST_LOCATION2 = 'BUS2-PORT1' TEST_LOCATION3 = 'BUS1-PORT2' TEST_ID = '00000000000000000000000000000000' TEST_ID_ARRAY = bytearray.fromhex(TEST_ID) TEST_NAME = 'name' TEST_FILE_NAME = 'filename' TEST_ATTRIBUTE_ARRAY = bytearray(1052) TEST_VBOOT_KEY_ARRAY = bytearray(128) TEST_ATTRIBUTE_STRING = base64.standard_b64encode(TEST_ATTRIBUTE_ARRAY) TEST_VBOOT_KEY_STRING = base64.standard_b64encode(TEST_VBOOT_KEY_ARRAY) class FastbootDeviceTemplate(object): @staticmethod def ListDevices(): pass def __init__(self, serial_number): self.serial_number = serial_number def Oem(self, oem_command, err_to_out): pass def Upload(self, file_path): pass def GetVar(self, var): pass def Download(self, file_path): pass def Disconnect(self): pass def GetHostOs(self): return 'Windows' def __del__(self): pass def MockInit(self, serial_number): mock_instance = MagicMock() mock_instance.serial_number = serial_number mock_instance.GetHostOs = MagicMock() mock_instance.GetHostOs.return_value = 'Windows' return mock_instance def setUp(self): self.mock_serial_mapper = MagicMock() self.mock_serial_instance = MagicMock() self.mock_serial_mapper.return_value = self.mock_serial_instance self.mock_serial_instance.get_serial_map.return_value = [] self.status_map = {} self.mock_timer_instance = None self.configs = {} self.configs['ATFA_REBOOT_TIMEOUT'] = 30 self.configs['DEFAULT_KEY_THRESHOLD'] = 100 # Test AtftManager.ListDevices class MockInstantTimer(object): def __init__(self, timeout, callback): self.timeout = timeout self.callback = callback def start(self): self.callback() def MockCreateInstantTimer(self, timeout, callback): return self.MockInstantTimer(timeout, callback) @patch('threading.Timer') @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices') def testListDevicesNormal(self, mock_list_devices, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._SetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_list_devices.return_value = [self.TEST_SERIAL, self.ATFA_TEST_SERIAL] atft_manager.ListDevices() atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) self.assertEqual(1, len(atft_manager.target_devs)) self.assertEqual(atft_manager.target_devs[0].serial_number, self.TEST_SERIAL) self.assertEqual(None, atft_manager._atfa_dev_setting) self.assertEqual(True, atft_manager._atfa_reboot_lock.acquire(False)) atft_manager._SetOs.assert_not_called() @patch('threading.Timer') @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices') def testListDevicesNormalSetOs(self, mock_list_devices, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._SetOs = MagicMock() atft_manager._GetOs.return_value = 'Linux' mock_list_devices.return_value = [self.TEST_SERIAL, self.ATFA_TEST_SERIAL] atft_manager.ListDevices() atft_manager.ListDevices() atft_manager._GetOs.assert_called() atft_manager._SetOs.assert_called_once_with( atft_manager.atfa_dev, 'Windows') self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) @patch('threading.Timer') @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices') def testListDevicesATFA(self, mock_list_devices, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_list_devices.return_value = [self.ATFA_TEST_SERIAL] atft_manager.ListDevices() atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) self.assertEqual(0, len(atft_manager.target_devs)) @patch('threading.Timer') @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices') def testListDevicesTarget(self, mock_list_devices, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_list_devices.return_value = [self.TEST_SERIAL] atft_manager.ListDevices() atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev, None) self.assertEqual(1, len(atft_manager.target_devs)) self.assertEqual(atft_manager.target_devs[0].serial_number, self.TEST_SERIAL) @patch('threading.Timer') @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices') def testListDevicesMultipleTargets(self, mock_list_devices, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_list_devices.return_value = [self.TEST_SERIAL, self.TEST_SERIAL2] atft_manager.ListDevices() atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev, None) self.assertEqual(2, len(atft_manager.target_devs)) self.assertEqual(atft_manager.target_devs[0].serial_number, self.TEST_SERIAL) self.assertEqual(atft_manager.target_devs[1].serial_number, self.TEST_SERIAL2) @patch('threading.Timer') def testListDevicesChangeNorm(self, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_fastboot.ListDevices.return_value = [self.ATFA_TEST_SERIAL] atft_manager.ListDevices() atft_manager.ListDevices() mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] atft_manager.ListDevices() atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev, None) self.assertEqual(1, len(atft_manager.target_devs)) self.assertEqual(atft_manager.target_devs[0].serial_number, self.TEST_SERIAL) self.assertEqual(2, mock_fastboot.call_count) @patch('threading.Timer') def testListDevicesChangeAdd(self, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_fastboot.ListDevices.return_value = [self.ATFA_TEST_SERIAL] atft_manager.ListDevices() atft_manager.ListDevices() mock_fastboot.ListDevices.return_value = [ self.ATFA_TEST_SERIAL, self.TEST_SERIAL ] atft_manager.ListDevices() atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) self.assertEqual(1, len(atft_manager.target_devs)) self.assertEqual(atft_manager.target_devs[0].serial_number, self.TEST_SERIAL) self.assertEqual(2, mock_fastboot.call_count) @patch('threading.Timer') def testListDevicesChangeAddATFA(self, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] atft_manager.ListDevices() atft_manager.ListDevices() mock_fastboot.ListDevices.return_value = [ self.ATFA_TEST_SERIAL, self.TEST_SERIAL ] atft_manager.ListDevices() atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) self.assertEqual(1, len(atft_manager.target_devs)) self.assertEqual(atft_manager.target_devs[0].serial_number, self.TEST_SERIAL) self.assertEqual(2, mock_fastboot.call_count) @patch('threading.Timer') def testListDevicesChangeCommon(self, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_fastboot.ListDevices.return_value = [ self.ATFA_TEST_SERIAL, self.TEST_SERIAL ] atft_manager.ListDevices() atft_manager.ListDevices() mock_fastboot.ListDevices.return_value = [ self.TEST_SERIAL2, self.TEST_SERIAL ] atft_manager.ListDevices() atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev, None) self.assertEqual(2, len(atft_manager.target_devs)) self.assertEqual(atft_manager.target_devs[0].serial_number, self.TEST_SERIAL) self.assertEqual(atft_manager.target_devs[1].serial_number, self.TEST_SERIAL2) self.assertEqual(3, mock_fastboot.call_count) @patch('threading.Timer') def testListDevicesChangeCommonATFA(self, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_fastboot.ListDevices.return_value = [ self.ATFA_TEST_SERIAL, self.TEST_SERIAL ] atft_manager.ListDevices() atft_manager.ListDevices() mock_fastboot.ListDevices.return_value = [ self.ATFA_TEST_SERIAL, self.TEST_SERIAL2 ] atft_manager.ListDevices() # First refresh, TEST_SERIAL should disappear self.assertEqual(0, len(atft_manager.target_devs)) # Second refresh, TEST_SERIAL2 should be added atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) self.assertEqual(1, len(atft_manager.target_devs)) self.assertEqual(atft_manager.target_devs[0].serial_number, self.TEST_SERIAL2) self.assertEqual(3, mock_fastboot.call_count) @patch('threading.Timer') def testListDevicesRemoveATFA(self, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_fastboot.ListDevices.return_value = [ self.ATFA_TEST_SERIAL, self.TEST_SERIAL ] atft_manager.ListDevices() atft_manager.ListDevices() mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev, None) self.assertEqual(1, len(atft_manager.target_devs)) self.assertEqual(atft_manager.target_devs[0].serial_number, self.TEST_SERIAL) @patch('threading.Timer') def testListDevicesRemoveDevice(self, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_fastboot.ListDevices.return_value = [ self.ATFA_TEST_SERIAL, self.TEST_SERIAL ] atft_manager.ListDevices() atft_manager.ListDevices() mock_fastboot.ListDevices.return_value = [self.ATFA_TEST_SERIAL] atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) self.assertEqual(0, len(atft_manager.target_devs)) @patch('threading.Timer') def testListDevicesPendingRemove(self, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] atft_manager.ListDevices() # Just appear once, should not be in target device list. self.assertEqual(0, len(atft_manager.target_devs)) @patch('threading.Timer') def testListDevicesPendingAdd(self, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] atft_manager.ListDevices() self.assertEqual(0, len(atft_manager.target_devs)) mock_fastboot.ListDevices.return_value = [ self.TEST_SERIAL, self.TEST_SERIAL2 ] # TEST_SERIAL appears twice, should be in the list. # TEST_SERIAL2 just appears once, should not be in the list. atft_manager.ListDevices() self.assertEqual(1, len(atft_manager.target_devs)) @patch('threading.Timer') def testListDevicesPendingTemp(self, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] atft_manager.ListDevices() self.assertEqual(0, len(atft_manager.target_devs)) mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL2] atft_manager.ListDevices() # Nothing appears twice. self.assertEqual(0, len(atft_manager.target_devs)) def mockSetSerialMapper(self, serial_map): self.serial_map = {} for serial in serial_map: self.serial_map[serial.lower()] = serial_map[serial] def mockGetLocation(self, serial): serial_lower = serial.lower() if serial_lower in self.serial_map: return self.serial_map[serial_lower] return None @patch('threading.Timer') def testListDevicesLocation(self, mock_create_timer): mock_create_timer.side_effect = self.MockCreateInstantTimer mock_serial_mapper = MagicMock() smap = { self.ATFA_TEST_SERIAL: self.TEST_LOCATION, self.TEST_SERIAL: self.TEST_LOCATION2 } mock_serial_instance = MagicMock() mock_serial_mapper.return_value = mock_serial_instance mock_serial_instance.refresh_serial_map.side_effect = ( lambda serial_map=smap: self.mockSetSerialMapper(serial_map)) mock_serial_instance.get_location.side_effect = self.mockGetLocation mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager( mock_fastboot, mock_serial_mapper, self.configs) atft_manager._GetOs = MagicMock() atft_manager._GetOs.return_value = 'Windows' mock_fastboot.ListDevices.return_value = [ self.ATFA_TEST_SERIAL, self.TEST_SERIAL ] atft_manager.ListDevices() atft_manager.ListDevices() self.assertEqual(atft_manager.atfa_dev.location, self.TEST_LOCATION) self.assertEqual(atft_manager.target_devs[0].location, self.TEST_LOCATION2) # Test _SortTargetDevices def testSortDevicesDefault(self): mock_serial_mapper = MagicMock() mock_serial_instance = MagicMock() mock_serial_mapper.return_value = mock_serial_instance smap = { self.TEST_SERIAL: self.TEST_LOCATION, self.TEST_SERIAL2: self.TEST_LOCATION2, self.TEST_SERIAL3: self.TEST_LOCATION3 } mock_serial_instance.refresh_serial_map.side_effect = ( lambda serial_map=smap: self.mockSetSerialMapper(serial_map)) mock_serial_instance.get_location.side_effect = self.mockGetLocation mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager( mock_fastboot, mock_serial_mapper, self.configs) mock_fastboot.ListDevices.return_value = [ self.TEST_SERIAL, self.TEST_SERIAL2, self.TEST_SERIAL3 ] atft_manager.ListDevices() atft_manager.ListDevices() self.assertEqual(3, len(atft_manager.target_devs)) self.assertEqual(self.TEST_LOCATION, atft_manager.target_devs[0].location) self.assertEqual(self.TEST_LOCATION3, atft_manager.target_devs[1].location) self.assertEqual(self.TEST_LOCATION2, atft_manager.target_devs[2].location) def testSortDevicesLocation(self): mock_serial_mapper = MagicMock() mock_serial_instance = MagicMock() mock_serial_mapper.return_value = mock_serial_instance smap = { self.TEST_SERIAL: self.TEST_LOCATION, self.TEST_SERIAL2: self.TEST_LOCATION2, self.TEST_SERIAL3: self.TEST_LOCATION3 } mock_serial_instance.refresh_serial_map.side_effect = ( lambda serial_map=smap: self.mockSetSerialMapper(serial_map)) mock_serial_instance.get_location.side_effect = self.mockGetLocation mock_fastboot = MagicMock() mock_fastboot.side_effect = self.MockInit atft_manager = atftman.AtftManager( mock_fastboot, mock_serial_mapper, self.configs) mock_fastboot.ListDevices.return_value = [ self.TEST_SERIAL, self.TEST_SERIAL2, self.TEST_SERIAL3 ] atft_manager.ListDevices(atft_manager.SORT_BY_LOCATION) atft_manager.ListDevices(atft_manager.SORT_BY_LOCATION) self.assertEqual(3, len(atft_manager.target_devs)) self.assertEqual(self.TEST_LOCATION, atft_manager.target_devs[0].location) self.assertEqual(self.TEST_LOCATION3, atft_manager.target_devs[1].location) self.assertEqual(self.TEST_LOCATION2, atft_manager.target_devs[2].location) def testSortDevicesSerial(self): mock_serial_mapper = MagicMock() mock_serial_instance = MagicMock() mock_serial_mapper.return_value = mock_serial_instance smap = { self.TEST_SERIAL: self.TEST_LOCATION, self.TEST_SERIAL2: self.TEST_LOCATION2, self.TEST_SERIAL3: self.TEST_LOCATION3 } mock_serial_instance.refresh_serial_map.side_effect = ( lambda serial_map=smap: self.mockSetSerialMapper(serial_map)) mock_serial_instance.get_location.side_effect = self.mockGetLocation mock_fastboot = MagicMock() mock_fastboot_instance = MagicMock() mock_fastboot.side_effect = self.MockInit mock_fastboot.return_value = mock_fastboot_instance mock_fastboot_instance.GetVar = MagicMock() atft_manager = atftman.AtftManager( mock_fastboot, mock_serial_mapper, self.configs) mock_fastboot.ListDevices.return_value = [ self.TEST_SERIAL2, self.TEST_SERIAL3, self.TEST_SERIAL ] atft_manager.ListDevices(atft_manager.SORT_BY_SERIAL) atft_manager.ListDevices(atft_manager.SORT_BY_SERIAL) self.assertEqual(3, len(atft_manager.target_devs)) self.assertEqual(self.TEST_SERIAL, atft_manager.target_devs[0].serial_number) self.assertEqual(self.TEST_SERIAL2, atft_manager.target_devs[1].serial_number) self.assertEqual(self.TEST_SERIAL3, atft_manager.target_devs[2].serial_number) # Test AtftManager.TransferContent @staticmethod def _AppendFile(file_path): files.append(file_path) @staticmethod def _CheckFile(file_path): assert file_path in files return True @staticmethod def _RemoveFile(file_path): assert file_path in files files.remove(file_path) @patch('os.rmdir') @patch('os.remove') @patch('os.path.exists') @patch('tempfile.mkdtemp') @patch('uuid.uuid1') @patch('__main__.AtftManTest.FastbootDeviceTemplate.Upload') @patch('__main__.AtftManTest.FastbootDeviceTemplate.Download') def testTransferContentNormal(self, mock_download, mock_upload, mock_uuid, mock_create_folder, mock_exists, mock_remove, mock_rmdir): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) # upload (to fs): create a temporary file mock_upload.side_effect = AtftManTest._AppendFile # download (from fs): check if the temporary file exists mock_download.side_effect = AtftManTest._CheckFile mock_exists.side_effect = AtftManTest._CheckFile # remove: remove the file mock_remove.side_effect = AtftManTest._RemoveFile mock_rmdir.side_effect = AtftManTest._RemoveFile mock_create_folder.return_value = self.TEST_TMP_FOLDER files.append(self.TEST_TMP_FOLDER) mock_uuid.return_value = self.TEST_UUID tmp_path = self.TEST_TMP_FOLDER + self.TEST_UUID src = self.FastbootDeviceTemplate(self.TEST_SERIAL) dst = self.FastbootDeviceTemplate(self.TEST_SERIAL) atft_manager.TransferContent(src, dst) src.Upload.assert_called_once_with(tmp_path) src.Download.assert_called_once_with(tmp_path) # we should have no temporary file at the end self.assertTrue(not files) # Test AtftManager._ChooseAlgorithm def testChooseAlgorithm(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) p256 = atftman.EncryptionAlgorithm.ALGORITHM_P256 curve = atftman.EncryptionAlgorithm.ALGORITHM_CURVE25519 algorithm = atft_manager._ChooseAlgorithm([p256, curve]) self.assertEqual(curve, algorithm) def testChooseAlgorithmP256(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) p256 = atftman.EncryptionAlgorithm.ALGORITHM_P256 algorithm = atft_manager._ChooseAlgorithm([p256]) self.assertEqual(p256, algorithm) def testChooseAlgorithmCurve(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) curve = atftman.EncryptionAlgorithm.ALGORITHM_CURVE25519 algorithm = atft_manager._ChooseAlgorithm([curve]) self.assertEqual(curve, algorithm) def testChooseAlgorithmException(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) with self.assertRaises(NoAlgorithmAvailableException): atft_manager._ChooseAlgorithm([]) def testChooseAlgorithmExceptionNoAvailable(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) with self.assertRaises(NoAlgorithmAvailableException): atft_manager._ChooseAlgorithm(['abcd']) # Test AtftManager._GetAlgorithmList def testGetAlgorithmList(self): mock_target = MagicMock() mock_target.GetVar.return_value = '1:p256,2:curve25519' atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) algorithm_list = atft_manager._GetAlgorithmList(mock_target) self.assertEqual(2, len(algorithm_list)) self.assertEqual(1, algorithm_list[0]) self.assertEqual(2, algorithm_list[1]) # Test DeviceInfo.__eq__ def testDeviceInfoEqual(self): test_device1 = atftman.DeviceInfo(None, self.TEST_SERIAL, self.TEST_LOCATION) test_device2 = atftman.DeviceInfo(None, self.TEST_SERIAL2, self.TEST_LOCATION2) test_device3 = atftman.DeviceInfo(None, self.TEST_SERIAL, self.TEST_LOCATION2) test_device4 = atftman.DeviceInfo(None, self.TEST_SERIAL2, self.TEST_LOCATION) test_device5 = atftman.DeviceInfo(None, self.TEST_SERIAL, self.TEST_LOCATION) self.assertEqual(test_device1, test_device5) self.assertNotEqual(test_device1, test_device2) self.assertNotEqual(test_device1, test_device3) self.assertNotEqual(test_device1, test_device4) # Test DeviceInfo.Copy def testDeviceInfoCopy(self): test_device1 = atftman.DeviceInfo(None, self.TEST_SERIAL, self.TEST_LOCATION) test_device2 = atftman.DeviceInfo(None, self.TEST_SERIAL2, self.TEST_LOCATION2) test_device3 = test_device1.Copy() self.assertEqual(test_device3, test_device1) self.assertNotEqual(test_device3, test_device2) # Test AtfaDeviceManager.CheckStatus def testCheckStatus(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_atfa_dev = MagicMock() atft_manager.atfa_dev = mock_atfa_dev test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) atft_manager.product_info = MagicMock() atft_manager.product_info.product_id = self.TEST_ID mock_atfa_dev.Oem.return_value = 'TEST\n(bootloader) 100\nTEST' test_number = test_atfa_device_manager.CheckStatus() mock_atfa_dev.Oem.assert_called_once_with('num-keys ' + self.TEST_ID, True) self.assertEqual(100, atft_manager.GetATFAKeysLeft()) def testCheckStatusCRLF(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_atfa_dev = MagicMock() atft_manager.atfa_dev = mock_atfa_dev test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) atft_manager.product_info = MagicMock() atft_manager.product_info.product_id = self.TEST_ID mock_atfa_dev.Oem.return_value = 'TEST\r\n(bootloader) 100\r\nTEST' test_number = test_atfa_device_manager.CheckStatus() mock_atfa_dev.Oem.assert_called_once_with('num-keys ' + self.TEST_ID, True) self.assertEqual(100, atft_manager.GetATFAKeysLeft()) def testCheckStatusNoProductId(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_atfa_dev = MagicMock() atft_manager.atfa_dev = mock_atfa_dev atft_manager.product_info = None test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) mock_atfa_dev.Oem.return_value = 'TEST\r\n(bootloader) 100\r\nTEST' with self.assertRaises(ProductNotSpecifiedException): test_atfa_device_manager.CheckStatus() def testCheckStatusNoATFA(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager.atfa_dev = None atft_manager.product_info = MagicMock() atft_manager.product_info.product_id = self.TEST_ID test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) with self.assertRaises(DeviceNotFoundException): test_atfa_device_manager.CheckStatus() def testCheckStatusInvalidFormat(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_atfa_dev = MagicMock() atft_manager.atfa_dev = mock_atfa_dev test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) atft_manager.product_info = MagicMock() atft_manager.product_info.product_id = self.TEST_ID mock_atfa_dev.Oem.return_value = 'TEST\nTEST' with self.assertRaises(FastbootFailure): test_atfa_device_manager.CheckStatus() def testCheckStatusInvalidNumber(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_atfa_dev = MagicMock() atft_manager.atfa_dev = mock_atfa_dev test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) atft_manager.product_info = MagicMock() atft_manager.product_info.product_id = self.TEST_ID mock_atfa_dev.Oem.return_value = 'TEST\n(bootloader) abcd\nTEST' with self.assertRaises(FastbootFailure): test_atfa_device_manager.CheckStatus() # Test AtftManager.CheckProvisionStatus def MockGetVar(self, variable): return self.status_map.get(variable) def testCheckProvisionStatus(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) # All initial state self.status_map = {} self.status_map['at-vboot-state'] = ( '(bootloader) bootloader-locked: 0\n' '(bootloader) bootloader-min-versions: -1,0,3\n' '(bootloader) avb-perm-attr-set: 0\n' '(bootloader) avb-locked: 0\n' '(bootloader) avb-unlock-disabled: 0\n' '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') self.status_map['at-attest-uuid'] = '' mock_device = MagicMock() mock_device.GetVar.side_effect = self.MockGetVar atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(ProvisionStatus.IDLE, mock_device.provision_status) # Attestation key provisioned self.status_map['at-attest-uuid'] = self.TEST_UUID atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(ProvisionStatus.PROVISION_SUCCESS, mock_device.provision_status) # AVB locked self.status_map['at-vboot-state'] = ( '(bootloader) bootloader-locked: 0\n' '(bootloader) bootloader-min-versions: -1,0,3\n' '(bootloader) avb-perm-attr-set: 0\n' '(bootloader) avb-locked: 1\n' '(bootloader) avb-unlock-disabled: 0\n' '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') self.status_map['at-attest-uuid'] = '' atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(ProvisionStatus.LOCKAVB_SUCCESS, mock_device.provision_status) # Permanent attributes fused self.status_map['at-vboot-state'] = ( '(bootloader) bootloader-locked: 0\n' '(bootloader) bootloader-min-versions: -1,0,3\n' '(bootloader) avb-perm-attr-set: 1\n' '(bootloader) avb-locked: 0\n' '(bootloader) avb-unlock-disabled: 0\n' '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') self.status_map['at-attest-uuid'] = '' atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(ProvisionStatus.FUSEATTR_SUCCESS, mock_device.provision_status) # Bootloader locked self.status_map['at-vboot-state'] = ( '(bootloader) bootloader-locked: 1\n' '(bootloader) bootloader-min-versions: -1,0,3\n' '(bootloader) avb-perm-attr-set: 0\n' '(bootloader) avb-locked: 0\n' '(bootloader) avb-unlock-disabled: 0\n' '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') self.status_map['at-attest-uuid'] = '' atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS, mock_device.provision_status) def testCheckProvisionStatusFormat(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) # All initial state self.status_map = {} self.status_map['at-vboot-state'] = ( '(bootloader) bootloader-locked=1\n' '(bootloader) bootloader-min-versions=-1,0,3\n' '(bootloader) avb-perm-attr-set=0\n' '(bootloader) avb-locked=0\n' '(bootloader) avb-unlock-disabled=0\n' '(bootloader) avb-min-versions=0:1,1:1,2:1,4097 :2,4098:2\n') self.status_map['at-attest-uuid'] = '' mock_device = MagicMock() mock_device.GetVar.side_effect = self.MockGetVar atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS, mock_device.provision_status) self.assertEqual(True, mock_device.provision_state.bootloader_locked) self.status_map['at-vboot-state'] = ( '(bootloader) bootloader-locked:1\n' '(bootloader) bootloader-min-versions:-1,0,3\n' '(bootloader) avb-perm-attr-set:0\n' '(bootloader) avb-locked:0\n' '(bootloader) avb-unlock-disabled:0\n' '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') self.status_map['at-attest-uuid'] = '' mock_device = MagicMock() mock_device.GetVar.side_effect = self.MockGetVar atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS, mock_device.provision_status) self.assertEqual(True, mock_device.provision_state.bootloader_locked) self.status_map['at-vboot-state'] = ( '(bootloader) bootloader-locked:\t1\n' '(bootloader) bootloader-min-versions:\t-1,0,3\n' '(bootloader) avb-perm-attr-set:\t0\n' '(bootloader) avb-locked:\t0\n' '(bootloader) avb-unlock-disabled:\t0\n' '(bootloader) avb-min-versions:\t0:1,1:1,2:1,4097 :2,4098:2\n') self.status_map['at-attest-uuid'] = '' mock_device = MagicMock() mock_device.GetVar.side_effect = self.MockGetVar atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS, mock_device.provision_status) self.assertEqual(True, mock_device.provision_state.bootloader_locked) def testCheckProvisionState(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) # All initial state self.status_map = {} self.status_map['at-vboot-state'] = ( '(bootloader) bootloader-locked: 0\n' '(bootloader) bootloader-min-versions: -1,0,3\n' '(bootloader) avb-perm-attr-set: 0\n' '(bootloader) avb-locked: 0\n' '(bootloader) avb-unlock-disabled: 0\n' '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') self.status_map['at-attest-uuid'] = '' mock_device = MagicMock() mock_device.GetVar.side_effect = self.MockGetVar atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(False, mock_device.provision_state.bootloader_locked) self.assertEqual(False, mock_device.provision_state.avb_perm_attr_set) self.assertEqual(False, mock_device.provision_state.avb_locked) self.assertEqual(False, mock_device.provision_state.provisioned) # Attestation key provisioned self.status_map['at-attest-uuid'] = self.TEST_UUID atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(False, mock_device.provision_state.bootloader_locked) self.assertEqual(False, mock_device.provision_state.avb_perm_attr_set) self.assertEqual(False, mock_device.provision_state.avb_locked) self.assertEqual(True, mock_device.provision_state.provisioned) # AVB locked and attestation key provisioned self.status_map['at-vboot-state'] = ( '(bootloader) bootloader-locked: 0\n' '(bootloader) bootloader-min-versions: -1,0,3\n' '(bootloader) avb-perm-attr-set: 0\n' '(bootloader) avb-locked: 1\n' '(bootloader) avb-unlock-disabled: 0\n' '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') self.status_map['at-attest-uuid'] = self.TEST_UUID atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(False, mock_device.provision_state.bootloader_locked) self.assertEqual(False, mock_device.provision_state.avb_perm_attr_set) self.assertEqual(True, mock_device.provision_state.avb_locked) self.assertEqual(True, mock_device.provision_state.provisioned) self.assertEqual(ProvisionStatus.PROVISION_SUCCESS, mock_device.provision_status) # Permanent attributes fused self.status_map['at-vboot-state'] = ( '(bootloader) bootloader-locked: 0\n' '(bootloader) bootloader-min-versions: -1,0,3\n' '(bootloader) avb-perm-attr-set: 1\n' '(bootloader) avb-locked: 0\n' '(bootloader) avb-unlock-disabled: 0\n' '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') self.status_map['at-attest-uuid'] = '' atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(False, mock_device.provision_state.bootloader_locked) self.assertEqual(True, mock_device.provision_state.avb_perm_attr_set) self.assertEqual(False, mock_device.provision_state.avb_locked) self.assertEqual(False, mock_device.provision_state.provisioned) # All status set self.status_map['at-vboot-state'] = ( '(bootloader) bootloader-locked: 1\n' '(bootloader) bootloader-min-versions: -1,0,3\n' '(bootloader) avb-perm-attr-set: 1\n' '(bootloader) avb-locked: 1\n' '(bootloader) avb-unlock-disabled: 0\n' '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') self.status_map['at-attest-uuid'] = self.TEST_UUID atft_manager.CheckProvisionStatus(mock_device) self.assertEqual(True, mock_device.provision_state.bootloader_locked) self.assertEqual(True, mock_device.provision_state.avb_perm_attr_set) self.assertEqual(True, mock_device.provision_state.avb_locked) self.assertEqual(True, mock_device.provision_state.provisioned) self.assertEqual(ProvisionStatus.PROVISION_SUCCESS, mock_device.provision_status) # Test AtftManager.Provision def MockSetProvisionSuccess(self, target): target.provision_status = ProvisionStatus.PROVISION_SUCCESS target.provision_state = ProvisionState() target.provision_state.provisioned = True def MockSetProvisionFail(self, target): target.provision_status = ProvisionStatus.PROVISION_FAILED target.provision_state = ProvisionState() target.provision_state.provisioned = False def testProvision(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_atfa = MagicMock() mock_target = MagicMock() atft_manager._atfa_dev_manager = MagicMock() atft_manager.atfa_dev = mock_atfa atft_manager._GetAlgorithmList = MagicMock() atft_manager._GetAlgorithmList.return_value = [ EncryptionAlgorithm.ALGORITHM_CURVE25519 ] atft_manager.TransferContent = MagicMock() atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetProvisionSuccess atft_manager.Provision(mock_target) # Make sure atfa.SetTime is called. atft_manager._atfa_dev_manager.SetTime.assert_called_once() # Transfer content should be ATFA->target, target->ATFA, ATFA->target transfer_content_calls = [ call(mock_atfa, mock_target), call(mock_target, mock_atfa), call(mock_atfa, mock_target) ] atft_manager.TransferContent.assert_has_calls(transfer_content_calls) atfa_oem_calls = [ call('atfa-start-provisioning ' + str(EncryptionAlgorithm.ALGORITHM_CURVE25519)), call('atfa-finish-provisioning') ] target_oem_calls = [ call('at-get-ca-request'), call('at-set-ca-response') ] mock_atfa.Oem.assert_has_calls(atfa_oem_calls) mock_target.Oem.assert_has_calls(target_oem_calls) def testProvisionFailed(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_atfa = MagicMock() mock_target = MagicMock() atft_manager._atfa_dev_manager = MagicMock() atft_manager.atfa_dev = mock_atfa atft_manager._GetAlgorithmList = MagicMock() atft_manager._GetAlgorithmList.return_value = [ EncryptionAlgorithm.ALGORITHM_CURVE25519 ] atft_manager.TransferContent = MagicMock() atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetProvisionFail with self.assertRaises(FastbootFailure): atft_manager.Provision(mock_target) # Test AtftManager.FuseVbootKey def MockSetFuseVbootSuccess(self, target): target.provision_status = ProvisionStatus.FUSEVBOOT_SUCCESS target.provision_state = ProvisionState() target.provision_state.bootloader_locked = True def MockSetFuseVbootFail(self, target): target.provision_status = ProvisionStatus.FUSEVBOOT_FAILED target.provision_state = ProvisionState() target.provision_state.bootloader_locked = False @patch('os.remove') @patch('tempfile.NamedTemporaryFile') def testFuseVbootKey(self, mock_create_temp_file, mock_remove): mock_file = MagicMock() mock_create_temp_file.return_value = mock_file mock_file.name = self.TEST_FILE_NAME atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager.product_info = ProductInfo( self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, self.TEST_VBOOT_KEY_ARRAY) mock_target = MagicMock() atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess atft_manager.FuseVbootKey(mock_target) mock_file.write.assert_called_once_with(self.TEST_VBOOT_KEY_ARRAY) mock_target.Download.assert_called_once_with(self.TEST_FILE_NAME) mock_remove.assert_called_once_with(self.TEST_FILE_NAME) mock_target.Oem.assert_called_once_with('fuse at-bootloader-vboot-key') @patch('os.remove') @patch('tempfile.NamedTemporaryFile') def testFuseVbootKeyFailed(self, mock_create_temp_file, _): mock_file = MagicMock() mock_create_temp_file.return_value = mock_file mock_file.name = self.TEST_FILE_NAME atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager.product_info = ProductInfo( self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, self.TEST_VBOOT_KEY_ARRAY) mock_target = MagicMock() atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootFail with self.assertRaises(FastbootFailure): atft_manager.FuseVbootKey(mock_target) def testFuseVbootKeyNoProduct(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_target = MagicMock() atft_manager.product_info = None with self.assertRaises(ProductNotSpecifiedException): atft_manager.FuseVbootKey(mock_target) @patch('os.remove') @patch('tempfile.NamedTemporaryFile') def testFuseVbootKeyFastbootFailure(self, mock_create_temp_file, _): mock_file = MagicMock() mock_create_temp_file.return_value = mock_file mock_file.name = self.TEST_FILE_NAME atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager.product_info = ProductInfo( self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, self.TEST_VBOOT_KEY_ARRAY) mock_target = MagicMock() atft_manager.CheckProvisionStatus = MagicMock() mock_target.Oem.side_effect = FastbootFailure('') with self.assertRaises(FastbootFailure): atft_manager.FuseVbootKey(mock_target) self.assertEqual( ProvisionStatus.FUSEVBOOT_FAILED, mock_target.provision_status) # Test AtftManager.FusePermAttr def MockSetFuseAttrSuccess(self, target): target.provision_status = ProvisionStatus.FUSEATTR_SUCCESS target.provision_state = ProvisionState() target.provision_state.avb_perm_attr_set = True def MockSetFuseAttrFail(self, target): target.provision_status = ProvisionStatus.FUSEATTR_FAILED target.provision_state = ProvisionState() target.provision_state.avb_perm_attr_set = False @patch('os.remove') @patch('tempfile.NamedTemporaryFile') def testFusePermAttr(self, mock_create_temp_file, mock_remove): mock_file = MagicMock() mock_create_temp_file.return_value = mock_file mock_file.name = self.TEST_FILE_NAME atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager.product_info = ProductInfo( self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, self.TEST_VBOOT_KEY_ARRAY) mock_target = MagicMock() atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseAttrSuccess atft_manager.FusePermAttr(mock_target) mock_file.write.assert_called_once_with(self.TEST_ATTRIBUTE_ARRAY) mock_target.Download.assert_called_once_with(self.TEST_FILE_NAME) mock_remove.assert_called_once_with(self.TEST_FILE_NAME) mock_target.Oem.assert_called_once_with('fuse at-perm-attr') @patch('os.remove') @patch('tempfile.NamedTemporaryFile') def testFusePermAttrFail(self, mock_create_temp_file, _): mock_file = MagicMock() mock_create_temp_file.return_value = mock_file mock_file.name = self.TEST_FILE_NAME atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager.product_info = ProductInfo( self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, self.TEST_VBOOT_KEY_ARRAY) mock_target = MagicMock() atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseAttrFail with self.assertRaises(FastbootFailure): atft_manager.FusePermAttr(mock_target) def testFusePermAttrNoProduct(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_target = MagicMock() atft_manager.product_info = None with self.assertRaises(ProductNotSpecifiedException): atft_manager.FusePermAttr(mock_target) @patch('os.remove') @patch('tempfile.NamedTemporaryFile') def testFusePermAttrFastbootFailure(self, mock_create_temp_file, _): mock_file = MagicMock() mock_create_temp_file.return_value = mock_file mock_file.name = self.TEST_FILE_NAME atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager.product_info = ProductInfo( self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, self.TEST_VBOOT_KEY_ARRAY) mock_target = MagicMock() atft_manager.CheckProvisionStatus = MagicMock() mock_target.Oem.side_effect = FastbootFailure('') with self.assertRaises(FastbootFailure): atft_manager.FusePermAttr(mock_target) self.assertEqual( ProvisionStatus.FUSEATTR_FAILED, mock_target.provision_status) # Test AtftManager.LockAvb def MockSetLockAvbSuccess(self, target): target.provision_status = ProvisionStatus.LOCKAVB_SUCCESS target.provision_state = ProvisionState() target.provision_state.avb_locked = True def MockSetLockAvbFail(self, target): target.provision_status = ProvisionStatus.LOCKAVB_FAILED target.provision_state = ProvisionState() target.provision_state.avb_locked = False def testLockAvb(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_target = MagicMock() atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetLockAvbSuccess atft_manager.LockAvb(mock_target) mock_target.Oem.assert_called_once_with('at-lock-vboot') self.assertEqual( ProvisionStatus.LOCKAVB_SUCCESS, mock_target.provision_status) def testLockAvbFail(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_target = MagicMock() atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetLockAvbFail with self.assertRaises(FastbootFailure): atft_manager.LockAvb(mock_target) def testLockAvbFastbootFailure(self): atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_target = MagicMock() atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetLockAvbSuccess mock_target.Oem.side_effect = FastbootFailure('') with self.assertRaises(FastbootFailure): atft_manager.LockAvb(mock_target) self.assertEqual( ProvisionStatus.LOCKAVB_FAILED, mock_target.provision_status) # Test AtftManager.Reboot class MockTimer(object): def __init__(self, interval, callback): self.interval = interval self.callback = callback def start(self): pass def refresh(self): if self.callback: self.callback() def cancel(self): self.callback = None def mock_create_timer(self, interval, callback): self.mock_timer_instance = self.MockTimer(interval, callback) return self.mock_timer_instance @patch('threading.Timer') def testRebootSuccess(self, mock_timer): self.mock_timer_instance = None atft_manager = atftman.AtftManager( self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) timeout = 1 atft_manager.stable_serials = [self.TEST_SERIAL] mock_fastboot = MagicMock() test_device = atftman.DeviceInfo( mock_fastboot, self.TEST_SERIAL, self.TEST_LOCATION) atft_manager.target_devs.append(test_device) mock_success = MagicMock() mock_fail = MagicMock() mock_timer.side_effect = self.mock_create_timer atft_manager.Reboot(test_device, timeout, mock_success, mock_fail) # During the reboot, the status should be REBOOT_ING. self.assertEqual(1, len(atft_manager.target_devs)) self.assertEqual( ProvisionStatus.REBOOT_ING, atft_manager.target_devs[0].provision_status) self.assertEqual( self.TEST_SERIAL, atft_manager.target_devs[0].serial_number) # After the device reappear, the status should be REBOOT_SUCCESS. atft_manager.stable_serials = [self.TEST_SERIAL] atft_manager._HandleRebootCallbacks() # mock timeout event. self.mock_timer_instance.refresh() self.assertEqual(1, len(atft_manager.target_devs)) self.assertEqual( ProvisionStatus.REBOOT_SUCCESS, atft_manager.target_devs[0].provision_status) mock_fastboot.Reboot.assert_called_once() # Success should be called, fail should not. mock_success.assert_called_once() mock_fail.assert_not_called() @patch('threading.Timer') def testRebootTimeout(self, mock_timer): self.mock_timer_instance = None atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) timeout = 1 atft_manager.stable_serials.append(self.TEST_SERIAL) mock_fastboot = MagicMock() test_device = atftman.DeviceInfo( mock_fastboot, self.TEST_SERIAL, self.TEST_LOCATION) atft_manager.target_devs.append(test_device) mock_success = MagicMock() mock_fail = MagicMock() # Status would be checked after reboot. We assume it's in FUSEVBOOT_SUCCESS atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess mock_timer.side_effect = self.mock_create_timer atft_manager.Reboot(test_device, timeout, mock_success, mock_fail) atft_manager.stable_serials = [] atft_manager._HandleRebootCallbacks() # mock timeout event. self.mock_timer_instance.refresh() self.assertEqual(0, len(atft_manager.target_devs)) mock_success.assert_not_called() mock_fail.assert_called_once() @patch('threading.Timer') def testRebootTimeoutBeforeRefresh(self, mock_timer): self.mock_timer_instance = None atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) timeout = 1 atft_manager.stable_serials.append(self.TEST_SERIAL) mock_fastboot = MagicMock() test_device = atftman.DeviceInfo( mock_fastboot, self.TEST_SERIAL, self.TEST_LOCATION) atft_manager.target_devs.append(test_device) mock_success = MagicMock() mock_fail = MagicMock() # Status would be checked after reboot. We assume it's in FUSEVBOOT_SUCCESS atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess mock_timer.side_effect = self.mock_create_timer atft_manager.Reboot(test_device, timeout, mock_success, mock_fail) atft_manager.stable_serials = [] # mock timeout event. self.mock_timer_instance.refresh() # mock refresh event. atft_manager._HandleRebootCallbacks() self.assertEqual(0, len(atft_manager.target_devs)) mock_success.assert_not_called() mock_fail.assert_called_once() @patch('threading.Timer') def testRebootFailure(self, mock_timer): self.mock_timer_instance = None atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) mock_target = MagicMock() mock_target.serial_number = self.TEST_SERIAL timeout = 1 atft_manager.stable_serials.append(self.TEST_SERIAL) test_device = atftman.DeviceInfo(None, self.TEST_SERIAL, self.TEST_LOCATION) atft_manager.target_devs.append(test_device) mock_success = MagicMock() mock_fail = MagicMock() # Status would be checked after reboot. We assume it's in FUSEVBOOT_SUCCESS atft_manager.CheckProvisionStatus = MagicMock() atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess mock_timer.side_effect = self.mock_create_timer mock_target.Reboot.side_effect = FastbootFailure('') with self.assertRaises(FastbootFailure): atft_manager.Reboot(mock_target, timeout, mock_success, mock_fail) # There should be no timeout timer. self.assertEqual(None, self.mock_timer_instance) # mock refresh event. atft_manager._HandleRebootCallbacks() mock_success.assert_not_called() mock_fail.assert_not_called() # Test AtftManager.ProcessProductAttributesFile def testProcessProductAttributesFile(self): test_content = ( '{' ' "productName": "%s",' ' "productConsoleId": "%s",' ' "productPermanentAttribute": "%s",' ' "bootloaderPublicKey": "%s",' ' "creationTime": ""' '}') % (self.TEST_NAME, self.TEST_ID, self.TEST_ATTRIBUTE_STRING, self.TEST_VBOOT_KEY_STRING) atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) atft_manager.ProcessProductAttributesFile(test_content) self.assertEqual(self.TEST_NAME, atft_manager.product_info.product_name) self.assertEqual(self.TEST_ID, atft_manager.product_info.product_id) self.assertEqual(self.TEST_ATTRIBUTE_ARRAY, atft_manager.product_info.product_attributes) self.assertEqual(self.TEST_VBOOT_KEY_ARRAY, atft_manager.product_info.vboot_key) def testProcessProductAttributesFileWrongJSON(self): test_content = ( '{' ' "productName": "%s",' ' "productConsoleId": "%s",' ' "productPermanentAttribute": "%s",' ' "bootloaderPublicKey": "%s",' ' "creationTime": ""' '') % (self.TEST_NAME, self.TEST_ID, self.TEST_ATTRIBUTE_STRING, self.TEST_VBOOT_KEY_STRING) atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) with self.assertRaises(ProductAttributesFileFormatError): atft_manager.ProcessProductAttributesFile(test_content) def testProcessProductAttributesFileMissingField(self): test_content = ( '{' ' "productConsoleId": "%s",' ' "productPermanentAttribute": "%s",' ' "bootloaderPublicKey": "%s",' ' "creationTime": ""' '}') % (self.TEST_ID, self.TEST_ATTRIBUTE_STRING, self.TEST_VBOOT_KEY_STRING) atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) with self.assertRaises(ProductAttributesFileFormatError): atft_manager.ProcessProductAttributesFile(test_content) def testProcessProductAttributesFileWrongLength(self): test_content = ( '{' ' "productName": "%s",' ' "productConsoleId": "%s",' ' "productPermanentAttribute": "%s",' ' "bootloaderPublicKey": "%s",' ' "creationTime": ""' '}') % (self.TEST_NAME, self.TEST_ID, base64.standard_b64encode(bytearray(1053)), self.TEST_VBOOT_KEY_STRING) atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) with self.assertRaises(ProductAttributesFileFormatError) as e: atft_manager.ProcessProductAttributesFile(test_content) def testProcessProductAttributesFileWrongBase64(self): test_content = ( '{' ' "productName": "%s",' ' "productConsoleId": "%s",' ' "productPermanentAttribute": "%s",' ' "bootloaderPublicKey": "%s",' ' "creationTime": ""' '}') % (self.TEST_NAME, self.TEST_ID, self.TEST_ATTRIBUTE_STRING, '12') atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) with self.assertRaises(ProductAttributesFileFormatError): atft_manager.ProcessProductAttributesFile(test_content) if __name__ == '__main__': unittest.main()