1# Copyright 2017 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Unit test for atft manager.""" 16import base64 17import unittest 18 19import atftman 20 21from atftman import EncryptionAlgorithm 22from atftman import ProductInfo 23from atftman import ProvisionState 24from atftman import ProvisionStatus 25from fastboot_exceptions import DeviceNotFoundException 26from fastboot_exceptions import FastbootFailure 27from fastboot_exceptions import NoAlgorithmAvailableException 28from fastboot_exceptions import ProductAttributesFileFormatError 29from fastboot_exceptions import ProductNotSpecifiedException 30from mock import call 31from mock import MagicMock 32from mock import patch 33 34files = [] 35 36 37class AtftManTest(unittest.TestCase): 38 ATFA_TEST_SERIAL = 'ATFA_TEST_SERIAL' 39 TEST_TMP_FOLDER = '/tmp/TMPTEST/' 40 TEST_SERIAL = 'TEST_SERIAL' 41 TEST_SERIAL2 = 'TEST_SERIAL2' 42 TEST_SERIAL3 = 'TEST_SERIAL3' 43 TEST_UUID = 'TEST-UUID' 44 TEST_LOCATION = 'BUS1-PORT1' 45 TEST_LOCATION2 = 'BUS2-PORT1' 46 TEST_LOCATION3 = 'BUS1-PORT2' 47 TEST_ID = '00000000000000000000000000000000' 48 TEST_ID_ARRAY = bytearray.fromhex(TEST_ID) 49 TEST_NAME = 'name' 50 TEST_FILE_NAME = 'filename' 51 TEST_ATTRIBUTE_ARRAY = bytearray(1052) 52 TEST_VBOOT_KEY_ARRAY = bytearray(128) 53 TEST_ATTRIBUTE_STRING = base64.standard_b64encode(TEST_ATTRIBUTE_ARRAY) 54 TEST_VBOOT_KEY_STRING = base64.standard_b64encode(TEST_VBOOT_KEY_ARRAY) 55 56 class FastbootDeviceTemplate(object): 57 58 @staticmethod 59 def ListDevices(): 60 pass 61 62 def __init__(self, serial_number): 63 self.serial_number = serial_number 64 65 def Oem(self, oem_command, err_to_out): 66 pass 67 68 def Upload(self, file_path): 69 pass 70 71 def GetVar(self, var): 72 pass 73 74 def Download(self, file_path): 75 pass 76 77 def Disconnect(self): 78 pass 79 80 def GetHostOs(self): 81 return 'Windows' 82 83 def __del__(self): 84 pass 85 86 def MockInit(self, serial_number): 87 mock_instance = MagicMock() 88 mock_instance.serial_number = serial_number 89 mock_instance.GetHostOs = MagicMock() 90 mock_instance.GetHostOs.return_value = 'Windows' 91 return mock_instance 92 93 def setUp(self): 94 self.mock_serial_mapper = MagicMock() 95 self.mock_serial_instance = MagicMock() 96 self.mock_serial_mapper.return_value = self.mock_serial_instance 97 self.mock_serial_instance.get_serial_map.return_value = [] 98 self.status_map = {} 99 self.mock_timer_instance = None 100 self.configs = {} 101 self.configs['ATFA_REBOOT_TIMEOUT'] = 30 102 self.configs['DEFAULT_KEY_THRESHOLD'] = 100 103 104 # Test AtftManager.ListDevices 105 class MockInstantTimer(object): 106 def __init__(self, timeout, callback): 107 self.timeout = timeout 108 self.callback = callback 109 110 def start(self): 111 self.callback() 112 113 def MockCreateInstantTimer(self, timeout, callback): 114 return self.MockInstantTimer(timeout, callback) 115 116 @patch('threading.Timer') 117 @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices') 118 def testListDevicesNormal(self, mock_list_devices, mock_create_timer): 119 mock_create_timer.side_effect = self.MockCreateInstantTimer 120 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 121 self.mock_serial_mapper, self.configs) 122 atft_manager._GetOs = MagicMock() 123 atft_manager._SetOs = MagicMock() 124 atft_manager._GetOs.return_value = 'Windows' 125 mock_list_devices.return_value = [self.TEST_SERIAL, self.ATFA_TEST_SERIAL] 126 atft_manager.ListDevices() 127 atft_manager.ListDevices() 128 self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) 129 self.assertEqual(1, len(atft_manager.target_devs)) 130 self.assertEqual(atft_manager.target_devs[0].serial_number, 131 self.TEST_SERIAL) 132 self.assertEqual(None, atft_manager._atfa_dev_setting) 133 self.assertEqual(True, atft_manager._atfa_reboot_lock.acquire(False)) 134 atft_manager._SetOs.assert_not_called() 135 136 @patch('threading.Timer') 137 @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices') 138 def testListDevicesNormalSetOs(self, mock_list_devices, mock_create_timer): 139 mock_create_timer.side_effect = self.MockCreateInstantTimer 140 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 141 self.mock_serial_mapper, self.configs) 142 atft_manager._GetOs = MagicMock() 143 atft_manager._SetOs = MagicMock() 144 atft_manager._GetOs.return_value = 'Linux' 145 mock_list_devices.return_value = [self.TEST_SERIAL, self.ATFA_TEST_SERIAL] 146 atft_manager.ListDevices() 147 atft_manager.ListDevices() 148 atft_manager._GetOs.assert_called() 149 atft_manager._SetOs.assert_called_once_with( 150 atft_manager.atfa_dev, 'Windows') 151 self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) 152 153 @patch('threading.Timer') 154 @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices') 155 def testListDevicesATFA(self, mock_list_devices, mock_create_timer): 156 mock_create_timer.side_effect = self.MockCreateInstantTimer 157 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 158 self.mock_serial_mapper, self.configs) 159 atft_manager._GetOs = MagicMock() 160 atft_manager._GetOs.return_value = 'Windows' 161 mock_list_devices.return_value = [self.ATFA_TEST_SERIAL] 162 atft_manager.ListDevices() 163 atft_manager.ListDevices() 164 self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) 165 self.assertEqual(0, len(atft_manager.target_devs)) 166 167 @patch('threading.Timer') 168 @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices') 169 def testListDevicesTarget(self, mock_list_devices, mock_create_timer): 170 mock_create_timer.side_effect = self.MockCreateInstantTimer 171 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 172 self.mock_serial_mapper, self.configs) 173 atft_manager._GetOs = MagicMock() 174 atft_manager._GetOs.return_value = 'Windows' 175 mock_list_devices.return_value = [self.TEST_SERIAL] 176 atft_manager.ListDevices() 177 atft_manager.ListDevices() 178 self.assertEqual(atft_manager.atfa_dev, None) 179 self.assertEqual(1, len(atft_manager.target_devs)) 180 self.assertEqual(atft_manager.target_devs[0].serial_number, 181 self.TEST_SERIAL) 182 183 @patch('threading.Timer') 184 @patch('__main__.AtftManTest.FastbootDeviceTemplate.ListDevices') 185 def testListDevicesMultipleTargets(self, mock_list_devices, mock_create_timer): 186 mock_create_timer.side_effect = self.MockCreateInstantTimer 187 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 188 self.mock_serial_mapper, self.configs) 189 atft_manager._GetOs = MagicMock() 190 atft_manager._GetOs.return_value = 'Windows' 191 mock_list_devices.return_value = [self.TEST_SERIAL, self.TEST_SERIAL2] 192 atft_manager.ListDevices() 193 atft_manager.ListDevices() 194 self.assertEqual(atft_manager.atfa_dev, None) 195 self.assertEqual(2, len(atft_manager.target_devs)) 196 self.assertEqual(atft_manager.target_devs[0].serial_number, 197 self.TEST_SERIAL) 198 self.assertEqual(atft_manager.target_devs[1].serial_number, 199 self.TEST_SERIAL2) 200 201 @patch('threading.Timer') 202 def testListDevicesChangeNorm(self, mock_create_timer): 203 mock_create_timer.side_effect = self.MockCreateInstantTimer 204 mock_fastboot = MagicMock() 205 mock_fastboot.side_effect = self.MockInit 206 atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, 207 self.configs) 208 atft_manager._GetOs = MagicMock() 209 atft_manager._GetOs.return_value = 'Windows' 210 mock_fastboot.ListDevices.return_value = [self.ATFA_TEST_SERIAL] 211 atft_manager.ListDevices() 212 atft_manager.ListDevices() 213 mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] 214 atft_manager.ListDevices() 215 atft_manager.ListDevices() 216 self.assertEqual(atft_manager.atfa_dev, None) 217 self.assertEqual(1, len(atft_manager.target_devs)) 218 self.assertEqual(atft_manager.target_devs[0].serial_number, 219 self.TEST_SERIAL) 220 self.assertEqual(2, mock_fastboot.call_count) 221 222 @patch('threading.Timer') 223 def testListDevicesChangeAdd(self, mock_create_timer): 224 mock_create_timer.side_effect = self.MockCreateInstantTimer 225 mock_fastboot = MagicMock() 226 mock_fastboot.side_effect = self.MockInit 227 atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, 228 self.configs) 229 atft_manager._GetOs = MagicMock() 230 atft_manager._GetOs.return_value = 'Windows' 231 mock_fastboot.ListDevices.return_value = [self.ATFA_TEST_SERIAL] 232 atft_manager.ListDevices() 233 atft_manager.ListDevices() 234 mock_fastboot.ListDevices.return_value = [ 235 self.ATFA_TEST_SERIAL, self.TEST_SERIAL 236 ] 237 atft_manager.ListDevices() 238 atft_manager.ListDevices() 239 self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) 240 self.assertEqual(1, len(atft_manager.target_devs)) 241 self.assertEqual(atft_manager.target_devs[0].serial_number, 242 self.TEST_SERIAL) 243 self.assertEqual(2, mock_fastboot.call_count) 244 245 @patch('threading.Timer') 246 def testListDevicesChangeAddATFA(self, mock_create_timer): 247 mock_create_timer.side_effect = self.MockCreateInstantTimer 248 mock_fastboot = MagicMock() 249 mock_fastboot.side_effect = self.MockInit 250 atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, 251 self.configs) 252 atft_manager._GetOs = MagicMock() 253 atft_manager._GetOs.return_value = 'Windows' 254 mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] 255 atft_manager.ListDevices() 256 atft_manager.ListDevices() 257 mock_fastboot.ListDevices.return_value = [ 258 self.ATFA_TEST_SERIAL, self.TEST_SERIAL 259 ] 260 atft_manager.ListDevices() 261 atft_manager.ListDevices() 262 self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) 263 self.assertEqual(1, len(atft_manager.target_devs)) 264 self.assertEqual(atft_manager.target_devs[0].serial_number, 265 self.TEST_SERIAL) 266 self.assertEqual(2, mock_fastboot.call_count) 267 268 @patch('threading.Timer') 269 def testListDevicesChangeCommon(self, mock_create_timer): 270 mock_create_timer.side_effect = self.MockCreateInstantTimer 271 mock_fastboot = MagicMock() 272 mock_fastboot.side_effect = self.MockInit 273 atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, 274 self.configs) 275 atft_manager._GetOs = MagicMock() 276 atft_manager._GetOs.return_value = 'Windows' 277 mock_fastboot.ListDevices.return_value = [ 278 self.ATFA_TEST_SERIAL, self.TEST_SERIAL 279 ] 280 atft_manager.ListDevices() 281 atft_manager.ListDevices() 282 mock_fastboot.ListDevices.return_value = [ 283 self.TEST_SERIAL2, self.TEST_SERIAL 284 ] 285 atft_manager.ListDevices() 286 atft_manager.ListDevices() 287 self.assertEqual(atft_manager.atfa_dev, None) 288 self.assertEqual(2, len(atft_manager.target_devs)) 289 self.assertEqual(atft_manager.target_devs[0].serial_number, 290 self.TEST_SERIAL) 291 self.assertEqual(atft_manager.target_devs[1].serial_number, 292 self.TEST_SERIAL2) 293 self.assertEqual(3, mock_fastboot.call_count) 294 295 @patch('threading.Timer') 296 def testListDevicesChangeCommonATFA(self, mock_create_timer): 297 mock_create_timer.side_effect = self.MockCreateInstantTimer 298 mock_fastboot = MagicMock() 299 mock_fastboot.side_effect = self.MockInit 300 atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, 301 self.configs) 302 atft_manager._GetOs = MagicMock() 303 atft_manager._GetOs.return_value = 'Windows' 304 mock_fastboot.ListDevices.return_value = [ 305 self.ATFA_TEST_SERIAL, self.TEST_SERIAL 306 ] 307 atft_manager.ListDevices() 308 atft_manager.ListDevices() 309 mock_fastboot.ListDevices.return_value = [ 310 self.ATFA_TEST_SERIAL, self.TEST_SERIAL2 311 ] 312 atft_manager.ListDevices() 313 # First refresh, TEST_SERIAL should disappear 314 self.assertEqual(0, len(atft_manager.target_devs)) 315 # Second refresh, TEST_SERIAL2 should be added 316 atft_manager.ListDevices() 317 self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) 318 self.assertEqual(1, len(atft_manager.target_devs)) 319 self.assertEqual(atft_manager.target_devs[0].serial_number, 320 self.TEST_SERIAL2) 321 self.assertEqual(3, mock_fastboot.call_count) 322 323 @patch('threading.Timer') 324 def testListDevicesRemoveATFA(self, mock_create_timer): 325 mock_create_timer.side_effect = self.MockCreateInstantTimer 326 mock_fastboot = MagicMock() 327 mock_fastboot.side_effect = self.MockInit 328 atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, 329 self.configs) 330 atft_manager._GetOs = MagicMock() 331 atft_manager._GetOs.return_value = 'Windows' 332 mock_fastboot.ListDevices.return_value = [ 333 self.ATFA_TEST_SERIAL, self.TEST_SERIAL 334 ] 335 atft_manager.ListDevices() 336 atft_manager.ListDevices() 337 mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] 338 atft_manager.ListDevices() 339 self.assertEqual(atft_manager.atfa_dev, None) 340 self.assertEqual(1, len(atft_manager.target_devs)) 341 self.assertEqual(atft_manager.target_devs[0].serial_number, 342 self.TEST_SERIAL) 343 344 @patch('threading.Timer') 345 def testListDevicesRemoveDevice(self, mock_create_timer): 346 mock_create_timer.side_effect = self.MockCreateInstantTimer 347 mock_fastboot = MagicMock() 348 mock_fastboot.side_effect = self.MockInit 349 atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, 350 self.configs) 351 atft_manager._GetOs = MagicMock() 352 atft_manager._GetOs.return_value = 'Windows' 353 mock_fastboot.ListDevices.return_value = [ 354 self.ATFA_TEST_SERIAL, self.TEST_SERIAL 355 ] 356 atft_manager.ListDevices() 357 atft_manager.ListDevices() 358 mock_fastboot.ListDevices.return_value = [self.ATFA_TEST_SERIAL] 359 atft_manager.ListDevices() 360 self.assertEqual(atft_manager.atfa_dev.serial_number, self.ATFA_TEST_SERIAL) 361 self.assertEqual(0, len(atft_manager.target_devs)) 362 363 @patch('threading.Timer') 364 def testListDevicesPendingRemove(self, mock_create_timer): 365 mock_create_timer.side_effect = self.MockCreateInstantTimer 366 mock_fastboot = MagicMock() 367 mock_fastboot.side_effect = self.MockInit 368 atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, 369 self.configs) 370 atft_manager._GetOs = MagicMock() 371 atft_manager._GetOs.return_value = 'Windows' 372 mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] 373 atft_manager.ListDevices() 374 # Just appear once, should not be in target device list. 375 self.assertEqual(0, len(atft_manager.target_devs)) 376 377 @patch('threading.Timer') 378 def testListDevicesPendingAdd(self, mock_create_timer): 379 mock_create_timer.side_effect = self.MockCreateInstantTimer 380 mock_fastboot = MagicMock() 381 mock_fastboot.side_effect = self.MockInit 382 atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, 383 self.configs) 384 atft_manager._GetOs = MagicMock() 385 atft_manager._GetOs.return_value = 'Windows' 386 mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] 387 atft_manager.ListDevices() 388 self.assertEqual(0, len(atft_manager.target_devs)) 389 mock_fastboot.ListDevices.return_value = [ 390 self.TEST_SERIAL, self.TEST_SERIAL2 391 ] 392 # TEST_SERIAL appears twice, should be in the list. 393 # TEST_SERIAL2 just appears once, should not be in the list. 394 atft_manager.ListDevices() 395 self.assertEqual(1, len(atft_manager.target_devs)) 396 397 @patch('threading.Timer') 398 def testListDevicesPendingTemp(self, mock_create_timer): 399 mock_create_timer.side_effect = self.MockCreateInstantTimer 400 mock_fastboot = MagicMock() 401 mock_fastboot.side_effect = self.MockInit 402 atft_manager = atftman.AtftManager(mock_fastboot, self.mock_serial_mapper, 403 self.configs) 404 atft_manager._GetOs = MagicMock() 405 atft_manager._GetOs.return_value = 'Windows' 406 mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL] 407 atft_manager.ListDevices() 408 self.assertEqual(0, len(atft_manager.target_devs)) 409 mock_fastboot.ListDevices.return_value = [self.TEST_SERIAL2] 410 atft_manager.ListDevices() 411 # Nothing appears twice. 412 self.assertEqual(0, len(atft_manager.target_devs)) 413 414 def mockSetSerialMapper(self, serial_map): 415 self.serial_map = {} 416 for serial in serial_map: 417 self.serial_map[serial.lower()] = serial_map[serial] 418 419 def mockGetLocation(self, serial): 420 serial_lower = serial.lower() 421 if serial_lower in self.serial_map: 422 return self.serial_map[serial_lower] 423 return None 424 425 @patch('threading.Timer') 426 def testListDevicesLocation(self, mock_create_timer): 427 mock_create_timer.side_effect = self.MockCreateInstantTimer 428 mock_serial_mapper = MagicMock() 429 smap = { 430 self.ATFA_TEST_SERIAL: self.TEST_LOCATION, 431 self.TEST_SERIAL: self.TEST_LOCATION2 432 } 433 mock_serial_instance = MagicMock() 434 mock_serial_mapper.return_value = mock_serial_instance 435 mock_serial_instance.refresh_serial_map.side_effect = ( 436 lambda serial_map=smap: self.mockSetSerialMapper(serial_map)) 437 mock_serial_instance.get_location.side_effect = self.mockGetLocation 438 mock_fastboot = MagicMock() 439 mock_fastboot.side_effect = self.MockInit 440 atft_manager = atftman.AtftManager( 441 mock_fastboot, mock_serial_mapper, self.configs) 442 atft_manager._GetOs = MagicMock() 443 atft_manager._GetOs.return_value = 'Windows' 444 mock_fastboot.ListDevices.return_value = [ 445 self.ATFA_TEST_SERIAL, self.TEST_SERIAL 446 ] 447 atft_manager.ListDevices() 448 atft_manager.ListDevices() 449 self.assertEqual(atft_manager.atfa_dev.location, self.TEST_LOCATION) 450 self.assertEqual(atft_manager.target_devs[0].location, self.TEST_LOCATION2) 451 452 # Test _SortTargetDevices 453 def testSortDevicesDefault(self): 454 mock_serial_mapper = MagicMock() 455 mock_serial_instance = MagicMock() 456 mock_serial_mapper.return_value = mock_serial_instance 457 smap = { 458 self.TEST_SERIAL: self.TEST_LOCATION, 459 self.TEST_SERIAL2: self.TEST_LOCATION2, 460 self.TEST_SERIAL3: self.TEST_LOCATION3 461 } 462 mock_serial_instance.refresh_serial_map.side_effect = ( 463 lambda serial_map=smap: self.mockSetSerialMapper(serial_map)) 464 mock_serial_instance.get_location.side_effect = self.mockGetLocation 465 mock_fastboot = MagicMock() 466 mock_fastboot.side_effect = self.MockInit 467 atft_manager = atftman.AtftManager( 468 mock_fastboot, mock_serial_mapper, self.configs) 469 mock_fastboot.ListDevices.return_value = [ 470 self.TEST_SERIAL, self.TEST_SERIAL2, self.TEST_SERIAL3 471 ] 472 atft_manager.ListDevices() 473 atft_manager.ListDevices() 474 self.assertEqual(3, len(atft_manager.target_devs)) 475 self.assertEqual(self.TEST_LOCATION, atft_manager.target_devs[0].location) 476 self.assertEqual(self.TEST_LOCATION3, atft_manager.target_devs[1].location) 477 self.assertEqual(self.TEST_LOCATION2, atft_manager.target_devs[2].location) 478 479 def testSortDevicesLocation(self): 480 mock_serial_mapper = MagicMock() 481 mock_serial_instance = MagicMock() 482 mock_serial_mapper.return_value = mock_serial_instance 483 smap = { 484 self.TEST_SERIAL: self.TEST_LOCATION, 485 self.TEST_SERIAL2: self.TEST_LOCATION2, 486 self.TEST_SERIAL3: self.TEST_LOCATION3 487 } 488 mock_serial_instance.refresh_serial_map.side_effect = ( 489 lambda serial_map=smap: self.mockSetSerialMapper(serial_map)) 490 mock_serial_instance.get_location.side_effect = self.mockGetLocation 491 mock_fastboot = MagicMock() 492 mock_fastboot.side_effect = self.MockInit 493 atft_manager = atftman.AtftManager( 494 mock_fastboot, mock_serial_mapper, self.configs) 495 mock_fastboot.ListDevices.return_value = [ 496 self.TEST_SERIAL, self.TEST_SERIAL2, self.TEST_SERIAL3 497 ] 498 atft_manager.ListDevices(atft_manager.SORT_BY_LOCATION) 499 atft_manager.ListDevices(atft_manager.SORT_BY_LOCATION) 500 self.assertEqual(3, len(atft_manager.target_devs)) 501 self.assertEqual(self.TEST_LOCATION, atft_manager.target_devs[0].location) 502 self.assertEqual(self.TEST_LOCATION3, atft_manager.target_devs[1].location) 503 self.assertEqual(self.TEST_LOCATION2, atft_manager.target_devs[2].location) 504 505 def testSortDevicesSerial(self): 506 mock_serial_mapper = MagicMock() 507 mock_serial_instance = MagicMock() 508 mock_serial_mapper.return_value = mock_serial_instance 509 smap = { 510 self.TEST_SERIAL: self.TEST_LOCATION, 511 self.TEST_SERIAL2: self.TEST_LOCATION2, 512 self.TEST_SERIAL3: self.TEST_LOCATION3 513 } 514 mock_serial_instance.refresh_serial_map.side_effect = ( 515 lambda serial_map=smap: self.mockSetSerialMapper(serial_map)) 516 mock_serial_instance.get_location.side_effect = self.mockGetLocation 517 mock_fastboot = MagicMock() 518 mock_fastboot_instance = MagicMock() 519 mock_fastboot.side_effect = self.MockInit 520 mock_fastboot.return_value = mock_fastboot_instance 521 mock_fastboot_instance.GetVar = MagicMock() 522 atft_manager = atftman.AtftManager( 523 mock_fastboot, mock_serial_mapper, self.configs) 524 mock_fastboot.ListDevices.return_value = [ 525 self.TEST_SERIAL2, self.TEST_SERIAL3, self.TEST_SERIAL 526 ] 527 atft_manager.ListDevices(atft_manager.SORT_BY_SERIAL) 528 atft_manager.ListDevices(atft_manager.SORT_BY_SERIAL) 529 self.assertEqual(3, len(atft_manager.target_devs)) 530 self.assertEqual(self.TEST_SERIAL, 531 atft_manager.target_devs[0].serial_number) 532 self.assertEqual(self.TEST_SERIAL2, 533 atft_manager.target_devs[1].serial_number) 534 self.assertEqual(self.TEST_SERIAL3, 535 atft_manager.target_devs[2].serial_number) 536 537 # Test AtftManager.TransferContent 538 539 @staticmethod 540 def _AppendFile(file_path): 541 files.append(file_path) 542 543 @staticmethod 544 def _CheckFile(file_path): 545 assert file_path in files 546 return True 547 548 @staticmethod 549 def _RemoveFile(file_path): 550 assert file_path in files 551 files.remove(file_path) 552 553 @patch('os.rmdir') 554 @patch('os.remove') 555 @patch('os.path.exists') 556 @patch('tempfile.mkdtemp') 557 @patch('uuid.uuid1') 558 @patch('__main__.AtftManTest.FastbootDeviceTemplate.Upload') 559 @patch('__main__.AtftManTest.FastbootDeviceTemplate.Download') 560 def testTransferContentNormal(self, mock_download, mock_upload, mock_uuid, 561 mock_create_folder, mock_exists, mock_remove, 562 mock_rmdir): 563 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 564 self.mock_serial_mapper, self.configs) 565 # upload (to fs): create a temporary file 566 mock_upload.side_effect = AtftManTest._AppendFile 567 # download (from fs): check if the temporary file exists 568 mock_download.side_effect = AtftManTest._CheckFile 569 mock_exists.side_effect = AtftManTest._CheckFile 570 # remove: remove the file 571 mock_remove.side_effect = AtftManTest._RemoveFile 572 mock_rmdir.side_effect = AtftManTest._RemoveFile 573 mock_create_folder.return_value = self.TEST_TMP_FOLDER 574 files.append(self.TEST_TMP_FOLDER) 575 mock_uuid.return_value = self.TEST_UUID 576 tmp_path = self.TEST_TMP_FOLDER + self.TEST_UUID 577 src = self.FastbootDeviceTemplate(self.TEST_SERIAL) 578 dst = self.FastbootDeviceTemplate(self.TEST_SERIAL) 579 atft_manager.TransferContent(src, dst) 580 src.Upload.assert_called_once_with(tmp_path) 581 src.Download.assert_called_once_with(tmp_path) 582 # we should have no temporary file at the end 583 self.assertTrue(not files) 584 585 # Test AtftManager._ChooseAlgorithm 586 def testChooseAlgorithm(self): 587 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 588 self.mock_serial_mapper, self.configs) 589 p256 = atftman.EncryptionAlgorithm.ALGORITHM_P256 590 curve = atftman.EncryptionAlgorithm.ALGORITHM_CURVE25519 591 algorithm = atft_manager._ChooseAlgorithm([p256, curve]) 592 self.assertEqual(curve, algorithm) 593 594 def testChooseAlgorithmP256(self): 595 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 596 self.mock_serial_mapper, self.configs) 597 p256 = atftman.EncryptionAlgorithm.ALGORITHM_P256 598 algorithm = atft_manager._ChooseAlgorithm([p256]) 599 self.assertEqual(p256, algorithm) 600 601 def testChooseAlgorithmCurve(self): 602 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 603 self.mock_serial_mapper, self.configs) 604 curve = atftman.EncryptionAlgorithm.ALGORITHM_CURVE25519 605 algorithm = atft_manager._ChooseAlgorithm([curve]) 606 self.assertEqual(curve, algorithm) 607 608 def testChooseAlgorithmException(self): 609 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 610 self.mock_serial_mapper, self.configs) 611 with self.assertRaises(NoAlgorithmAvailableException): 612 atft_manager._ChooseAlgorithm([]) 613 614 def testChooseAlgorithmExceptionNoAvailable(self): 615 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 616 self.mock_serial_mapper, self.configs) 617 with self.assertRaises(NoAlgorithmAvailableException): 618 atft_manager._ChooseAlgorithm(['abcd']) 619 620 # Test AtftManager._GetAlgorithmList 621 def testGetAlgorithmList(self): 622 mock_target = MagicMock() 623 mock_target.GetVar.return_value = '1:p256,2:curve25519' 624 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 625 self.mock_serial_mapper, self.configs) 626 algorithm_list = atft_manager._GetAlgorithmList(mock_target) 627 self.assertEqual(2, len(algorithm_list)) 628 self.assertEqual(1, algorithm_list[0]) 629 self.assertEqual(2, algorithm_list[1]) 630 631 # Test DeviceInfo.__eq__ 632 def testDeviceInfoEqual(self): 633 test_device1 = atftman.DeviceInfo(None, self.TEST_SERIAL, 634 self.TEST_LOCATION) 635 test_device2 = atftman.DeviceInfo(None, self.TEST_SERIAL2, 636 self.TEST_LOCATION2) 637 test_device3 = atftman.DeviceInfo(None, self.TEST_SERIAL, 638 self.TEST_LOCATION2) 639 test_device4 = atftman.DeviceInfo(None, self.TEST_SERIAL2, 640 self.TEST_LOCATION) 641 test_device5 = atftman.DeviceInfo(None, self.TEST_SERIAL, 642 self.TEST_LOCATION) 643 self.assertEqual(test_device1, test_device5) 644 self.assertNotEqual(test_device1, test_device2) 645 self.assertNotEqual(test_device1, test_device3) 646 self.assertNotEqual(test_device1, test_device4) 647 648 # Test DeviceInfo.Copy 649 def testDeviceInfoCopy(self): 650 test_device1 = atftman.DeviceInfo(None, self.TEST_SERIAL, 651 self.TEST_LOCATION) 652 test_device2 = atftman.DeviceInfo(None, self.TEST_SERIAL2, 653 self.TEST_LOCATION2) 654 test_device3 = test_device1.Copy() 655 self.assertEqual(test_device3, test_device1) 656 self.assertNotEqual(test_device3, test_device2) 657 658 # Test AtfaDeviceManager.CheckStatus 659 def testCheckStatus(self): 660 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 661 self.mock_serial_mapper, self.configs) 662 mock_atfa_dev = MagicMock() 663 atft_manager.atfa_dev = mock_atfa_dev 664 test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) 665 atft_manager.product_info = MagicMock() 666 atft_manager.product_info.product_id = self.TEST_ID 667 mock_atfa_dev.Oem.return_value = 'TEST\n(bootloader) 100\nTEST' 668 test_number = test_atfa_device_manager.CheckStatus() 669 mock_atfa_dev.Oem.assert_called_once_with('num-keys ' + self.TEST_ID, True) 670 self.assertEqual(100, atft_manager.GetATFAKeysLeft()) 671 672 def testCheckStatusCRLF(self): 673 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 674 self.mock_serial_mapper, self.configs) 675 mock_atfa_dev = MagicMock() 676 atft_manager.atfa_dev = mock_atfa_dev 677 test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) 678 atft_manager.product_info = MagicMock() 679 atft_manager.product_info.product_id = self.TEST_ID 680 mock_atfa_dev.Oem.return_value = 'TEST\r\n(bootloader) 100\r\nTEST' 681 test_number = test_atfa_device_manager.CheckStatus() 682 mock_atfa_dev.Oem.assert_called_once_with('num-keys ' + self.TEST_ID, True) 683 self.assertEqual(100, atft_manager.GetATFAKeysLeft()) 684 685 def testCheckStatusNoProductId(self): 686 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 687 self.mock_serial_mapper, self.configs) 688 mock_atfa_dev = MagicMock() 689 atft_manager.atfa_dev = mock_atfa_dev 690 atft_manager.product_info = None 691 test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) 692 mock_atfa_dev.Oem.return_value = 'TEST\r\n(bootloader) 100\r\nTEST' 693 with self.assertRaises(ProductNotSpecifiedException): 694 test_atfa_device_manager.CheckStatus() 695 696 def testCheckStatusNoATFA(self): 697 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 698 self.mock_serial_mapper, self.configs) 699 atft_manager.atfa_dev = None 700 atft_manager.product_info = MagicMock() 701 atft_manager.product_info.product_id = self.TEST_ID 702 test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) 703 with self.assertRaises(DeviceNotFoundException): 704 test_atfa_device_manager.CheckStatus() 705 706 def testCheckStatusInvalidFormat(self): 707 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 708 self.mock_serial_mapper, self.configs) 709 mock_atfa_dev = MagicMock() 710 atft_manager.atfa_dev = mock_atfa_dev 711 test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) 712 atft_manager.product_info = MagicMock() 713 atft_manager.product_info.product_id = self.TEST_ID 714 mock_atfa_dev.Oem.return_value = 'TEST\nTEST' 715 with self.assertRaises(FastbootFailure): 716 test_atfa_device_manager.CheckStatus() 717 718 def testCheckStatusInvalidNumber(self): 719 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 720 self.mock_serial_mapper, self.configs) 721 mock_atfa_dev = MagicMock() 722 atft_manager.atfa_dev = mock_atfa_dev 723 test_atfa_device_manager = atftman.AtfaDeviceManager(atft_manager) 724 atft_manager.product_info = MagicMock() 725 atft_manager.product_info.product_id = self.TEST_ID 726 mock_atfa_dev.Oem.return_value = 'TEST\n(bootloader) abcd\nTEST' 727 with self.assertRaises(FastbootFailure): 728 test_atfa_device_manager.CheckStatus() 729 730 # Test AtftManager.CheckProvisionStatus 731 def MockGetVar(self, variable): 732 return self.status_map.get(variable) 733 734 def testCheckProvisionStatus(self): 735 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 736 self.mock_serial_mapper, self.configs) 737 # All initial state 738 self.status_map = {} 739 self.status_map['at-vboot-state'] = ( 740 '(bootloader) bootloader-locked: 0\n' 741 '(bootloader) bootloader-min-versions: -1,0,3\n' 742 '(bootloader) avb-perm-attr-set: 0\n' 743 '(bootloader) avb-locked: 0\n' 744 '(bootloader) avb-unlock-disabled: 0\n' 745 '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') 746 self.status_map['at-attest-uuid'] = '' 747 mock_device = MagicMock() 748 mock_device.GetVar.side_effect = self.MockGetVar 749 atft_manager.CheckProvisionStatus(mock_device) 750 self.assertEqual(ProvisionStatus.IDLE, mock_device.provision_status) 751 752 # Attestation key provisioned 753 self.status_map['at-attest-uuid'] = self.TEST_UUID 754 atft_manager.CheckProvisionStatus(mock_device) 755 self.assertEqual(ProvisionStatus.PROVISION_SUCCESS, 756 mock_device.provision_status) 757 758 # AVB locked 759 self.status_map['at-vboot-state'] = ( 760 '(bootloader) bootloader-locked: 0\n' 761 '(bootloader) bootloader-min-versions: -1,0,3\n' 762 '(bootloader) avb-perm-attr-set: 0\n' 763 '(bootloader) avb-locked: 1\n' 764 '(bootloader) avb-unlock-disabled: 0\n' 765 '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') 766 self.status_map['at-attest-uuid'] = '' 767 atft_manager.CheckProvisionStatus(mock_device) 768 self.assertEqual(ProvisionStatus.LOCKAVB_SUCCESS, 769 mock_device.provision_status) 770 771 # Permanent attributes fused 772 self.status_map['at-vboot-state'] = ( 773 '(bootloader) bootloader-locked: 0\n' 774 '(bootloader) bootloader-min-versions: -1,0,3\n' 775 '(bootloader) avb-perm-attr-set: 1\n' 776 '(bootloader) avb-locked: 0\n' 777 '(bootloader) avb-unlock-disabled: 0\n' 778 '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') 779 self.status_map['at-attest-uuid'] = '' 780 atft_manager.CheckProvisionStatus(mock_device) 781 self.assertEqual(ProvisionStatus.FUSEATTR_SUCCESS, 782 mock_device.provision_status) 783 784 # Bootloader locked 785 self.status_map['at-vboot-state'] = ( 786 '(bootloader) bootloader-locked: 1\n' 787 '(bootloader) bootloader-min-versions: -1,0,3\n' 788 '(bootloader) avb-perm-attr-set: 0\n' 789 '(bootloader) avb-locked: 0\n' 790 '(bootloader) avb-unlock-disabled: 0\n' 791 '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') 792 self.status_map['at-attest-uuid'] = '' 793 atft_manager.CheckProvisionStatus(mock_device) 794 self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS, 795 mock_device.provision_status) 796 797 def testCheckProvisionStatusFormat(self): 798 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 799 self.mock_serial_mapper, self.configs) 800 # All initial state 801 self.status_map = {} 802 self.status_map['at-vboot-state'] = ( 803 '(bootloader) bootloader-locked=1\n' 804 '(bootloader) bootloader-min-versions=-1,0,3\n' 805 '(bootloader) avb-perm-attr-set=0\n' 806 '(bootloader) avb-locked=0\n' 807 '(bootloader) avb-unlock-disabled=0\n' 808 '(bootloader) avb-min-versions=0:1,1:1,2:1,4097 :2,4098:2\n') 809 self.status_map['at-attest-uuid'] = '' 810 mock_device = MagicMock() 811 mock_device.GetVar.side_effect = self.MockGetVar 812 atft_manager.CheckProvisionStatus(mock_device) 813 self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS, mock_device.provision_status) 814 self.assertEqual(True, mock_device.provision_state.bootloader_locked) 815 816 self.status_map['at-vboot-state'] = ( 817 '(bootloader) bootloader-locked:1\n' 818 '(bootloader) bootloader-min-versions:-1,0,3\n' 819 '(bootloader) avb-perm-attr-set:0\n' 820 '(bootloader) avb-locked:0\n' 821 '(bootloader) avb-unlock-disabled:0\n' 822 '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') 823 self.status_map['at-attest-uuid'] = '' 824 mock_device = MagicMock() 825 mock_device.GetVar.side_effect = self.MockGetVar 826 atft_manager.CheckProvisionStatus(mock_device) 827 self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS, mock_device.provision_status) 828 self.assertEqual(True, mock_device.provision_state.bootloader_locked) 829 830 self.status_map['at-vboot-state'] = ( 831 '(bootloader) bootloader-locked:\t1\n' 832 '(bootloader) bootloader-min-versions:\t-1,0,3\n' 833 '(bootloader) avb-perm-attr-set:\t0\n' 834 '(bootloader) avb-locked:\t0\n' 835 '(bootloader) avb-unlock-disabled:\t0\n' 836 '(bootloader) avb-min-versions:\t0:1,1:1,2:1,4097 :2,4098:2\n') 837 self.status_map['at-attest-uuid'] = '' 838 mock_device = MagicMock() 839 mock_device.GetVar.side_effect = self.MockGetVar 840 atft_manager.CheckProvisionStatus(mock_device) 841 self.assertEqual(ProvisionStatus.FUSEVBOOT_SUCCESS, mock_device.provision_status) 842 self.assertEqual(True, mock_device.provision_state.bootloader_locked) 843 844 def testCheckProvisionState(self): 845 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 846 self.mock_serial_mapper, self.configs) 847 # All initial state 848 self.status_map = {} 849 self.status_map['at-vboot-state'] = ( 850 '(bootloader) bootloader-locked: 0\n' 851 '(bootloader) bootloader-min-versions: -1,0,3\n' 852 '(bootloader) avb-perm-attr-set: 0\n' 853 '(bootloader) avb-locked: 0\n' 854 '(bootloader) avb-unlock-disabled: 0\n' 855 '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') 856 self.status_map['at-attest-uuid'] = '' 857 mock_device = MagicMock() 858 mock_device.GetVar.side_effect = self.MockGetVar 859 atft_manager.CheckProvisionStatus(mock_device) 860 self.assertEqual(False, mock_device.provision_state.bootloader_locked) 861 self.assertEqual(False, mock_device.provision_state.avb_perm_attr_set) 862 self.assertEqual(False, mock_device.provision_state.avb_locked) 863 self.assertEqual(False, mock_device.provision_state.provisioned) 864 865 # Attestation key provisioned 866 self.status_map['at-attest-uuid'] = self.TEST_UUID 867 atft_manager.CheckProvisionStatus(mock_device) 868 self.assertEqual(False, mock_device.provision_state.bootloader_locked) 869 self.assertEqual(False, mock_device.provision_state.avb_perm_attr_set) 870 self.assertEqual(False, mock_device.provision_state.avb_locked) 871 self.assertEqual(True, mock_device.provision_state.provisioned) 872 873 # AVB locked and attestation key provisioned 874 self.status_map['at-vboot-state'] = ( 875 '(bootloader) bootloader-locked: 0\n' 876 '(bootloader) bootloader-min-versions: -1,0,3\n' 877 '(bootloader) avb-perm-attr-set: 0\n' 878 '(bootloader) avb-locked: 1\n' 879 '(bootloader) avb-unlock-disabled: 0\n' 880 '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') 881 self.status_map['at-attest-uuid'] = self.TEST_UUID 882 atft_manager.CheckProvisionStatus(mock_device) 883 self.assertEqual(False, mock_device.provision_state.bootloader_locked) 884 self.assertEqual(False, mock_device.provision_state.avb_perm_attr_set) 885 self.assertEqual(True, mock_device.provision_state.avb_locked) 886 self.assertEqual(True, mock_device.provision_state.provisioned) 887 self.assertEqual(ProvisionStatus.PROVISION_SUCCESS, 888 mock_device.provision_status) 889 890 # Permanent attributes fused 891 self.status_map['at-vboot-state'] = ( 892 '(bootloader) bootloader-locked: 0\n' 893 '(bootloader) bootloader-min-versions: -1,0,3\n' 894 '(bootloader) avb-perm-attr-set: 1\n' 895 '(bootloader) avb-locked: 0\n' 896 '(bootloader) avb-unlock-disabled: 0\n' 897 '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') 898 self.status_map['at-attest-uuid'] = '' 899 atft_manager.CheckProvisionStatus(mock_device) 900 self.assertEqual(False, mock_device.provision_state.bootloader_locked) 901 self.assertEqual(True, mock_device.provision_state.avb_perm_attr_set) 902 self.assertEqual(False, mock_device.provision_state.avb_locked) 903 self.assertEqual(False, mock_device.provision_state.provisioned) 904 905 # All status set 906 self.status_map['at-vboot-state'] = ( 907 '(bootloader) bootloader-locked: 1\n' 908 '(bootloader) bootloader-min-versions: -1,0,3\n' 909 '(bootloader) avb-perm-attr-set: 1\n' 910 '(bootloader) avb-locked: 1\n' 911 '(bootloader) avb-unlock-disabled: 0\n' 912 '(bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2\n') 913 self.status_map['at-attest-uuid'] = self.TEST_UUID 914 atft_manager.CheckProvisionStatus(mock_device) 915 self.assertEqual(True, mock_device.provision_state.bootloader_locked) 916 self.assertEqual(True, mock_device.provision_state.avb_perm_attr_set) 917 self.assertEqual(True, mock_device.provision_state.avb_locked) 918 self.assertEqual(True, mock_device.provision_state.provisioned) 919 self.assertEqual(ProvisionStatus.PROVISION_SUCCESS, 920 mock_device.provision_status) 921 922 # Test AtftManager.Provision 923 def MockSetProvisionSuccess(self, target): 924 target.provision_status = ProvisionStatus.PROVISION_SUCCESS 925 target.provision_state = ProvisionState() 926 target.provision_state.provisioned = True 927 928 def MockSetProvisionFail(self, target): 929 target.provision_status = ProvisionStatus.PROVISION_FAILED 930 target.provision_state = ProvisionState() 931 target.provision_state.provisioned = False 932 933 def testProvision(self): 934 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 935 self.mock_serial_mapper, self.configs) 936 mock_atfa = MagicMock() 937 mock_target = MagicMock() 938 atft_manager._atfa_dev_manager = MagicMock() 939 atft_manager.atfa_dev = mock_atfa 940 atft_manager._GetAlgorithmList = MagicMock() 941 atft_manager._GetAlgorithmList.return_value = [ 942 EncryptionAlgorithm.ALGORITHM_CURVE25519 943 ] 944 atft_manager.TransferContent = MagicMock() 945 atft_manager.CheckProvisionStatus = MagicMock() 946 atft_manager.CheckProvisionStatus.side_effect = self.MockSetProvisionSuccess 947 948 atft_manager.Provision(mock_target) 949 950 # Make sure atfa.SetTime is called. 951 atft_manager._atfa_dev_manager.SetTime.assert_called_once() 952 # Transfer content should be ATFA->target, target->ATFA, ATFA->target 953 transfer_content_calls = [ 954 call(mock_atfa, mock_target), 955 call(mock_target, mock_atfa), 956 call(mock_atfa, mock_target) 957 ] 958 atft_manager.TransferContent.assert_has_calls(transfer_content_calls) 959 atfa_oem_calls = [ 960 call('atfa-start-provisioning ' + 961 str(EncryptionAlgorithm.ALGORITHM_CURVE25519)), 962 call('atfa-finish-provisioning') 963 ] 964 target_oem_calls = [ 965 call('at-get-ca-request'), 966 call('at-set-ca-response') 967 ] 968 mock_atfa.Oem.assert_has_calls(atfa_oem_calls) 969 mock_target.Oem.assert_has_calls(target_oem_calls) 970 971 def testProvisionFailed(self): 972 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 973 self.mock_serial_mapper, self.configs) 974 mock_atfa = MagicMock() 975 mock_target = MagicMock() 976 atft_manager._atfa_dev_manager = MagicMock() 977 atft_manager.atfa_dev = mock_atfa 978 atft_manager._GetAlgorithmList = MagicMock() 979 atft_manager._GetAlgorithmList.return_value = [ 980 EncryptionAlgorithm.ALGORITHM_CURVE25519 981 ] 982 atft_manager.TransferContent = MagicMock() 983 atft_manager.CheckProvisionStatus = MagicMock() 984 atft_manager.CheckProvisionStatus.side_effect = self.MockSetProvisionFail 985 with self.assertRaises(FastbootFailure): 986 atft_manager.Provision(mock_target) 987 988 # Test AtftManager.FuseVbootKey 989 def MockSetFuseVbootSuccess(self, target): 990 target.provision_status = ProvisionStatus.FUSEVBOOT_SUCCESS 991 target.provision_state = ProvisionState() 992 target.provision_state.bootloader_locked = True 993 994 def MockSetFuseVbootFail(self, target): 995 target.provision_status = ProvisionStatus.FUSEVBOOT_FAILED 996 target.provision_state = ProvisionState() 997 target.provision_state.bootloader_locked = False 998 999 @patch('os.remove') 1000 @patch('tempfile.NamedTemporaryFile') 1001 def testFuseVbootKey(self, mock_create_temp_file, mock_remove): 1002 mock_file = MagicMock() 1003 mock_create_temp_file.return_value = mock_file 1004 mock_file.name = self.TEST_FILE_NAME 1005 1006 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1007 self.mock_serial_mapper, self.configs) 1008 atft_manager.product_info = ProductInfo( 1009 self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, 1010 self.TEST_VBOOT_KEY_ARRAY) 1011 mock_target = MagicMock() 1012 atft_manager.CheckProvisionStatus = MagicMock() 1013 atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess 1014 1015 atft_manager.FuseVbootKey(mock_target) 1016 1017 mock_file.write.assert_called_once_with(self.TEST_VBOOT_KEY_ARRAY) 1018 mock_target.Download.assert_called_once_with(self.TEST_FILE_NAME) 1019 mock_remove.assert_called_once_with(self.TEST_FILE_NAME) 1020 mock_target.Oem.assert_called_once_with('fuse at-bootloader-vboot-key') 1021 1022 @patch('os.remove') 1023 @patch('tempfile.NamedTemporaryFile') 1024 def testFuseVbootKeyFailed(self, mock_create_temp_file, _): 1025 mock_file = MagicMock() 1026 mock_create_temp_file.return_value = mock_file 1027 mock_file.name = self.TEST_FILE_NAME 1028 1029 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1030 self.mock_serial_mapper, self.configs) 1031 atft_manager.product_info = ProductInfo( 1032 self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, 1033 self.TEST_VBOOT_KEY_ARRAY) 1034 mock_target = MagicMock() 1035 atft_manager.CheckProvisionStatus = MagicMock() 1036 atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootFail 1037 with self.assertRaises(FastbootFailure): 1038 atft_manager.FuseVbootKey(mock_target) 1039 1040 def testFuseVbootKeyNoProduct(self): 1041 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1042 self.mock_serial_mapper, self.configs) 1043 mock_target = MagicMock() 1044 atft_manager.product_info = None 1045 with self.assertRaises(ProductNotSpecifiedException): 1046 atft_manager.FuseVbootKey(mock_target) 1047 1048 @patch('os.remove') 1049 @patch('tempfile.NamedTemporaryFile') 1050 def testFuseVbootKeyFastbootFailure(self, mock_create_temp_file, _): 1051 mock_file = MagicMock() 1052 mock_create_temp_file.return_value = mock_file 1053 mock_file.name = self.TEST_FILE_NAME 1054 1055 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1056 self.mock_serial_mapper, self.configs) 1057 atft_manager.product_info = ProductInfo( 1058 self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, 1059 self.TEST_VBOOT_KEY_ARRAY) 1060 mock_target = MagicMock() 1061 atft_manager.CheckProvisionStatus = MagicMock() 1062 mock_target.Oem.side_effect = FastbootFailure('') 1063 1064 with self.assertRaises(FastbootFailure): 1065 atft_manager.FuseVbootKey(mock_target) 1066 self.assertEqual( 1067 ProvisionStatus.FUSEVBOOT_FAILED, mock_target.provision_status) 1068 1069 # Test AtftManager.FusePermAttr 1070 def MockSetFuseAttrSuccess(self, target): 1071 target.provision_status = ProvisionStatus.FUSEATTR_SUCCESS 1072 target.provision_state = ProvisionState() 1073 target.provision_state.avb_perm_attr_set = True 1074 1075 def MockSetFuseAttrFail(self, target): 1076 target.provision_status = ProvisionStatus.FUSEATTR_FAILED 1077 target.provision_state = ProvisionState() 1078 target.provision_state.avb_perm_attr_set = False 1079 1080 @patch('os.remove') 1081 @patch('tempfile.NamedTemporaryFile') 1082 def testFusePermAttr(self, mock_create_temp_file, mock_remove): 1083 mock_file = MagicMock() 1084 mock_create_temp_file.return_value = mock_file 1085 mock_file.name = self.TEST_FILE_NAME 1086 1087 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1088 self.mock_serial_mapper, self.configs) 1089 atft_manager.product_info = ProductInfo( 1090 self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, 1091 self.TEST_VBOOT_KEY_ARRAY) 1092 mock_target = MagicMock() 1093 atft_manager.CheckProvisionStatus = MagicMock() 1094 atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseAttrSuccess 1095 1096 atft_manager.FusePermAttr(mock_target) 1097 1098 mock_file.write.assert_called_once_with(self.TEST_ATTRIBUTE_ARRAY) 1099 mock_target.Download.assert_called_once_with(self.TEST_FILE_NAME) 1100 mock_remove.assert_called_once_with(self.TEST_FILE_NAME) 1101 mock_target.Oem.assert_called_once_with('fuse at-perm-attr') 1102 1103 @patch('os.remove') 1104 @patch('tempfile.NamedTemporaryFile') 1105 def testFusePermAttrFail(self, mock_create_temp_file, _): 1106 mock_file = MagicMock() 1107 mock_create_temp_file.return_value = mock_file 1108 mock_file.name = self.TEST_FILE_NAME 1109 1110 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1111 self.mock_serial_mapper, self.configs) 1112 atft_manager.product_info = ProductInfo( 1113 self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, 1114 self.TEST_VBOOT_KEY_ARRAY) 1115 mock_target = MagicMock() 1116 atft_manager.CheckProvisionStatus = MagicMock() 1117 atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseAttrFail 1118 with self.assertRaises(FastbootFailure): 1119 atft_manager.FusePermAttr(mock_target) 1120 1121 def testFusePermAttrNoProduct(self): 1122 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1123 self.mock_serial_mapper, self.configs) 1124 mock_target = MagicMock() 1125 atft_manager.product_info = None 1126 with self.assertRaises(ProductNotSpecifiedException): 1127 atft_manager.FusePermAttr(mock_target) 1128 1129 @patch('os.remove') 1130 @patch('tempfile.NamedTemporaryFile') 1131 def testFusePermAttrFastbootFailure(self, mock_create_temp_file, _): 1132 mock_file = MagicMock() 1133 mock_create_temp_file.return_value = mock_file 1134 mock_file.name = self.TEST_FILE_NAME 1135 1136 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1137 self.mock_serial_mapper, self.configs) 1138 atft_manager.product_info = ProductInfo( 1139 self.TEST_ID, self.TEST_NAME, self.TEST_ATTRIBUTE_ARRAY, 1140 self.TEST_VBOOT_KEY_ARRAY) 1141 mock_target = MagicMock() 1142 atft_manager.CheckProvisionStatus = MagicMock() 1143 mock_target.Oem.side_effect = FastbootFailure('') 1144 1145 with self.assertRaises(FastbootFailure): 1146 atft_manager.FusePermAttr(mock_target) 1147 self.assertEqual( 1148 ProvisionStatus.FUSEATTR_FAILED, mock_target.provision_status) 1149 1150 # Test AtftManager.LockAvb 1151 def MockSetLockAvbSuccess(self, target): 1152 target.provision_status = ProvisionStatus.LOCKAVB_SUCCESS 1153 target.provision_state = ProvisionState() 1154 target.provision_state.avb_locked = True 1155 1156 def MockSetLockAvbFail(self, target): 1157 target.provision_status = ProvisionStatus.LOCKAVB_FAILED 1158 target.provision_state = ProvisionState() 1159 target.provision_state.avb_locked = False 1160 1161 def testLockAvb(self): 1162 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1163 self.mock_serial_mapper, self.configs) 1164 mock_target = MagicMock() 1165 atft_manager.CheckProvisionStatus = MagicMock() 1166 atft_manager.CheckProvisionStatus.side_effect = self.MockSetLockAvbSuccess 1167 atft_manager.LockAvb(mock_target) 1168 mock_target.Oem.assert_called_once_with('at-lock-vboot') 1169 self.assertEqual( 1170 ProvisionStatus.LOCKAVB_SUCCESS, mock_target.provision_status) 1171 1172 def testLockAvbFail(self): 1173 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1174 self.mock_serial_mapper, self.configs) 1175 mock_target = MagicMock() 1176 atft_manager.CheckProvisionStatus = MagicMock() 1177 atft_manager.CheckProvisionStatus.side_effect = self.MockSetLockAvbFail 1178 with self.assertRaises(FastbootFailure): 1179 atft_manager.LockAvb(mock_target) 1180 1181 def testLockAvbFastbootFailure(self): 1182 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1183 self.mock_serial_mapper, self.configs) 1184 mock_target = MagicMock() 1185 atft_manager.CheckProvisionStatus = MagicMock() 1186 atft_manager.CheckProvisionStatus.side_effect = self.MockSetLockAvbSuccess 1187 mock_target.Oem.side_effect = FastbootFailure('') 1188 with self.assertRaises(FastbootFailure): 1189 atft_manager.LockAvb(mock_target) 1190 self.assertEqual( 1191 ProvisionStatus.LOCKAVB_FAILED, mock_target.provision_status) 1192 1193 # Test AtftManager.Reboot 1194 class MockTimer(object): 1195 def __init__(self, interval, callback): 1196 self.interval = interval 1197 self.callback = callback 1198 1199 def start(self): 1200 pass 1201 1202 def refresh(self): 1203 if self.callback: 1204 self.callback() 1205 1206 def cancel(self): 1207 self.callback = None 1208 1209 def mock_create_timer(self, interval, callback): 1210 self.mock_timer_instance = self.MockTimer(interval, callback) 1211 return self.mock_timer_instance 1212 1213 @patch('threading.Timer') 1214 def testRebootSuccess(self, mock_timer): 1215 self.mock_timer_instance = None 1216 atft_manager = atftman.AtftManager( 1217 self.FastbootDeviceTemplate, self.mock_serial_mapper, self.configs) 1218 timeout = 1 1219 atft_manager.stable_serials = [self.TEST_SERIAL] 1220 mock_fastboot = MagicMock() 1221 test_device = atftman.DeviceInfo( 1222 mock_fastboot, self.TEST_SERIAL, self.TEST_LOCATION) 1223 1224 atft_manager.target_devs.append(test_device) 1225 mock_success = MagicMock() 1226 mock_fail = MagicMock() 1227 mock_timer.side_effect = self.mock_create_timer 1228 1229 atft_manager.Reboot(test_device, timeout, mock_success, mock_fail) 1230 1231 # During the reboot, the status should be REBOOT_ING. 1232 self.assertEqual(1, len(atft_manager.target_devs)) 1233 self.assertEqual( 1234 ProvisionStatus.REBOOT_ING, 1235 atft_manager.target_devs[0].provision_status) 1236 self.assertEqual( 1237 self.TEST_SERIAL, atft_manager.target_devs[0].serial_number) 1238 1239 # After the device reappear, the status should be REBOOT_SUCCESS. 1240 atft_manager.stable_serials = [self.TEST_SERIAL] 1241 atft_manager._HandleRebootCallbacks() 1242 # mock timeout event. 1243 self.mock_timer_instance.refresh() 1244 1245 self.assertEqual(1, len(atft_manager.target_devs)) 1246 self.assertEqual( 1247 ProvisionStatus.REBOOT_SUCCESS, 1248 atft_manager.target_devs[0].provision_status) 1249 mock_fastboot.Reboot.assert_called_once() 1250 1251 # Success should be called, fail should not. 1252 mock_success.assert_called_once() 1253 mock_fail.assert_not_called() 1254 1255 @patch('threading.Timer') 1256 def testRebootTimeout(self, mock_timer): 1257 self.mock_timer_instance = None 1258 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1259 self.mock_serial_mapper, self.configs) 1260 timeout = 1 1261 atft_manager.stable_serials.append(self.TEST_SERIAL) 1262 mock_fastboot = MagicMock() 1263 test_device = atftman.DeviceInfo( 1264 mock_fastboot, self.TEST_SERIAL, self.TEST_LOCATION) 1265 atft_manager.target_devs.append(test_device) 1266 mock_success = MagicMock() 1267 mock_fail = MagicMock() 1268 # Status would be checked after reboot. We assume it's in FUSEVBOOT_SUCCESS 1269 atft_manager.CheckProvisionStatus = MagicMock() 1270 atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess 1271 mock_timer.side_effect = self.mock_create_timer 1272 1273 atft_manager.Reboot(test_device, timeout, mock_success, mock_fail) 1274 atft_manager.stable_serials = [] 1275 1276 atft_manager._HandleRebootCallbacks() 1277 1278 # mock timeout event. 1279 self.mock_timer_instance.refresh() 1280 1281 self.assertEqual(0, len(atft_manager.target_devs)) 1282 mock_success.assert_not_called() 1283 mock_fail.assert_called_once() 1284 1285 @patch('threading.Timer') 1286 def testRebootTimeoutBeforeRefresh(self, mock_timer): 1287 self.mock_timer_instance = None 1288 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1289 self.mock_serial_mapper, self.configs) 1290 timeout = 1 1291 atft_manager.stable_serials.append(self.TEST_SERIAL) 1292 mock_fastboot = MagicMock() 1293 test_device = atftman.DeviceInfo( 1294 mock_fastboot, self.TEST_SERIAL, self.TEST_LOCATION) 1295 atft_manager.target_devs.append(test_device) 1296 mock_success = MagicMock() 1297 mock_fail = MagicMock() 1298 # Status would be checked after reboot. We assume it's in FUSEVBOOT_SUCCESS 1299 atft_manager.CheckProvisionStatus = MagicMock() 1300 atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess 1301 mock_timer.side_effect = self.mock_create_timer 1302 1303 atft_manager.Reboot(test_device, timeout, mock_success, mock_fail) 1304 atft_manager.stable_serials = [] 1305 # mock timeout event. 1306 self.mock_timer_instance.refresh() 1307 # mock refresh event. 1308 atft_manager._HandleRebootCallbacks() 1309 1310 self.assertEqual(0, len(atft_manager.target_devs)) 1311 mock_success.assert_not_called() 1312 mock_fail.assert_called_once() 1313 1314 @patch('threading.Timer') 1315 def testRebootFailure(self, mock_timer): 1316 self.mock_timer_instance = None 1317 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1318 self.mock_serial_mapper, self.configs) 1319 mock_target = MagicMock() 1320 mock_target.serial_number = self.TEST_SERIAL 1321 timeout = 1 1322 atft_manager.stable_serials.append(self.TEST_SERIAL) 1323 test_device = atftman.DeviceInfo(None, self.TEST_SERIAL, self.TEST_LOCATION) 1324 atft_manager.target_devs.append(test_device) 1325 mock_success = MagicMock() 1326 mock_fail = MagicMock() 1327 # Status would be checked after reboot. We assume it's in FUSEVBOOT_SUCCESS 1328 atft_manager.CheckProvisionStatus = MagicMock() 1329 atft_manager.CheckProvisionStatus.side_effect = self.MockSetFuseVbootSuccess 1330 mock_timer.side_effect = self.mock_create_timer 1331 mock_target.Reboot.side_effect = FastbootFailure('') 1332 1333 with self.assertRaises(FastbootFailure): 1334 atft_manager.Reboot(mock_target, timeout, mock_success, mock_fail) 1335 1336 # There should be no timeout timer. 1337 self.assertEqual(None, self.mock_timer_instance) 1338 # mock refresh event. 1339 atft_manager._HandleRebootCallbacks() 1340 mock_success.assert_not_called() 1341 mock_fail.assert_not_called() 1342 1343 # Test AtftManager.ProcessProductAttributesFile 1344 def testProcessProductAttributesFile(self): 1345 test_content = ( 1346 '{' 1347 ' "productName": "%s",' 1348 ' "productConsoleId": "%s",' 1349 ' "productPermanentAttribute": "%s",' 1350 ' "bootloaderPublicKey": "%s",' 1351 ' "creationTime": ""' 1352 '}') % (self.TEST_NAME, self.TEST_ID, self.TEST_ATTRIBUTE_STRING, 1353 self.TEST_VBOOT_KEY_STRING) 1354 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1355 self.mock_serial_mapper, self.configs) 1356 atft_manager.ProcessProductAttributesFile(test_content) 1357 self.assertEqual(self.TEST_NAME, atft_manager.product_info.product_name) 1358 self.assertEqual(self.TEST_ID, atft_manager.product_info.product_id) 1359 self.assertEqual(self.TEST_ATTRIBUTE_ARRAY, 1360 atft_manager.product_info.product_attributes) 1361 self.assertEqual(self.TEST_VBOOT_KEY_ARRAY, 1362 atft_manager.product_info.vboot_key) 1363 1364 def testProcessProductAttributesFileWrongJSON(self): 1365 test_content = ( 1366 '{' 1367 ' "productName": "%s",' 1368 ' "productConsoleId": "%s",' 1369 ' "productPermanentAttribute": "%s",' 1370 ' "bootloaderPublicKey": "%s",' 1371 ' "creationTime": ""' 1372 '') % (self.TEST_NAME, self.TEST_ID, self.TEST_ATTRIBUTE_STRING, 1373 self.TEST_VBOOT_KEY_STRING) 1374 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1375 self.mock_serial_mapper, self.configs) 1376 with self.assertRaises(ProductAttributesFileFormatError): 1377 atft_manager.ProcessProductAttributesFile(test_content) 1378 1379 def testProcessProductAttributesFileMissingField(self): 1380 test_content = ( 1381 '{' 1382 ' "productConsoleId": "%s",' 1383 ' "productPermanentAttribute": "%s",' 1384 ' "bootloaderPublicKey": "%s",' 1385 ' "creationTime": ""' 1386 '}') % (self.TEST_ID, self.TEST_ATTRIBUTE_STRING, 1387 self.TEST_VBOOT_KEY_STRING) 1388 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1389 self.mock_serial_mapper, self.configs) 1390 with self.assertRaises(ProductAttributesFileFormatError): 1391 atft_manager.ProcessProductAttributesFile(test_content) 1392 1393 def testProcessProductAttributesFileWrongLength(self): 1394 test_content = ( 1395 '{' 1396 ' "productName": "%s",' 1397 ' "productConsoleId": "%s",' 1398 ' "productPermanentAttribute": "%s",' 1399 ' "bootloaderPublicKey": "%s",' 1400 ' "creationTime": ""' 1401 '}') % (self.TEST_NAME, self.TEST_ID, 1402 base64.standard_b64encode(bytearray(1053)), 1403 self.TEST_VBOOT_KEY_STRING) 1404 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1405 self.mock_serial_mapper, self.configs) 1406 with self.assertRaises(ProductAttributesFileFormatError) as e: 1407 atft_manager.ProcessProductAttributesFile(test_content) 1408 1409 def testProcessProductAttributesFileWrongBase64(self): 1410 test_content = ( 1411 '{' 1412 ' "productName": "%s",' 1413 ' "productConsoleId": "%s",' 1414 ' "productPermanentAttribute": "%s",' 1415 ' "bootloaderPublicKey": "%s",' 1416 ' "creationTime": ""' 1417 '}') % (self.TEST_NAME, self.TEST_ID, self.TEST_ATTRIBUTE_STRING, 1418 '12') 1419 atft_manager = atftman.AtftManager(self.FastbootDeviceTemplate, 1420 self.mock_serial_mapper, self.configs) 1421 with self.assertRaises(ProductAttributesFileFormatError): 1422 atft_manager.ProcessProductAttributesFile(test_content) 1423 1424if __name__ == '__main__': 1425 unittest.main() 1426