1#!/usr/bin/python 2# -*- coding: utf-8 -*- 3# Copyright 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""AT-Factory-Tool manager module. 18 19This module provides the logical implementation of the graphical tool for 20managing the ATFA and AT communication. 21""" 22import base64 23from datetime import datetime 24import json 25import os 26import re 27import tempfile 28import threading 29import uuid 30 31from fastboot_exceptions import DeviceNotFoundException 32from fastboot_exceptions import FastbootFailure 33from fastboot_exceptions import NoAlgorithmAvailableException 34from fastboot_exceptions import ProductAttributesFileFormatError 35from fastboot_exceptions import ProductNotSpecifiedException 36 37BOOTLOADER_STRING = '(bootloader) ' 38 39 40class EncryptionAlgorithm(object): 41 """The support encryption algorithm constant.""" 42 ALGORITHM_P256 = 1 43 ALGORITHM_CURVE25519 = 2 44 45 46class ProvisionStatus(object): 47 """The provision status constant.""" 48 _PROCESSING = 0 49 _SUCCESS = 1 50 _FAILED = 2 51 52 IDLE = 0 53 WAITING = 1 54 FUSEVBOOT_ING = (10 + _PROCESSING) 55 FUSEVBOOT_SUCCESS = (10 + _SUCCESS) 56 FUSEVBOOT_FAILED = (10 + _FAILED) 57 REBOOT_ING = (20 + _PROCESSING) 58 REBOOT_SUCCESS = (20 + _SUCCESS) 59 REBOOT_FAILED = (20 + _FAILED) 60 FUSEATTR_ING = (30 + _PROCESSING) 61 FUSEATTR_SUCCESS = (30 + _SUCCESS) 62 FUSEATTR_FAILED = (30 + _FAILED) 63 LOCKAVB_ING = (40 + _PROCESSING) 64 LOCKAVB_SUCCESS = (40 + _SUCCESS) 65 LOCKAVB_FAILED = (40 + _FAILED) 66 PROVISION_ING = (50 + _PROCESSING) 67 PROVISION_SUCCESS = (50 + _SUCCESS) 68 PROVISION_FAILED = ( + _FAILED) 69 70 STRING_MAP = { 71 IDLE : ['Idle', '初始'], 72 WAITING : ['Waiting', '等待'], 73 FUSEVBOOT_ING : ['Fusing Bootloader Vboot Key...', '烧录引导密钥中...'], 74 FUSEVBOOT_SUCCESS : ['Bootloader Locked', '烧录引导密钥成功'], 75 FUSEVBOOT_FAILED : ['Fuse Bootloader Vboot Key Failed', '烧录引导密钥失败'], 76 REBOOT_ING : ['Rebooting Device To Check Vboot Key...', 77 '重启设备中...'], 78 REBOOT_SUCCESS : ['Bootloader Verified Boot Checked', '重启设备成功'], 79 REBOOT_FAILED : ['Reboot Device Failed', '重启设备失败'], 80 FUSEATTR_ING : ['Fusing Permanent Attributes', '烧录产品信息中...'], 81 FUSEATTR_SUCCESS : ['Permanent Attributes Fused', '烧录产品信息成功'], 82 FUSEATTR_FAILED : ['Fuse Permanent Attributes Failed', '烧录产品信息失败'], 83 LOCKAVB_ING : ['Locking Android Verified Boot', '锁定AVB中...'], 84 LOCKAVB_SUCCESS : ['Android Verified Boot Locked', '锁定AVB成功'], 85 LOCKAVB_FAILED : ['Lock Android Verified Boot Failed', '锁定AVB失败'], 86 PROVISION_ING : ['Provisioning Attestation Key', '传输密钥中...'], 87 PROVISION_SUCCESS : ['Attestation Key Provisioned', '传输密钥成功'], 88 PROVISION_FAILED : ['Provision Attestation Key Failed', '传输密钥失败'] 89 90 } 91 92 @staticmethod 93 def ToString(provision_status, language_index): 94 return ProvisionStatus.STRING_MAP[provision_status][language_index] 95 96 @staticmethod 97 def isSuccess(provision_status): 98 return provision_status % 10 == ProvisionStatus._SUCCESS 99 100 @staticmethod 101 def isProcessing(provision_status): 102 return provision_status % 10 == ProvisionStatus._PROCESSING 103 104 @staticmethod 105 def isFailed(provision_status): 106 return provision_status % 10 == ProvisionStatus._FAILED 107 108 109class ProvisionState(object): 110 """The provision state of the target device.""" 111 bootloader_locked = False 112 avb_perm_attr_set = False 113 avb_locked = False 114 provisioned = False 115 116 117class ProductInfo(object): 118 """The information about a product. 119 120 Attributes: 121 product_id: The id for the product. 122 product_name: The name for the product. 123 product_attributes: The byte array of the product permanent attributes. 124 """ 125 126 def __init__(self, product_id, product_name, product_attributes, vboot_key): 127 self.product_id = product_id 128 self.product_name = product_name 129 self.product_attributes = product_attributes 130 self.vboot_key = vboot_key 131 132 133class DeviceInfo(object): 134 """The class to wrap the information about a fastboot device. 135 136 Attributes: 137 serial_number: The serial number for the device. 138 location: The physical USB location for the device. 139 """ 140 141 def __init__(self, _fastboot_device_controller, serial_number, 142 location=None, provision_status=ProvisionStatus.IDLE, 143 provision_state=ProvisionState()): 144 self._fastboot_device_controller = _fastboot_device_controller 145 self.serial_number = serial_number 146 self.location = location 147 # The provision status and provision state is only meaningful for target 148 # device. 149 self.provision_status = provision_status 150 self.provision_state = provision_state 151 # The number of attestation keys left for the selected product. This 152 # attribute is only meaning for ATFA device. 153 self.keys_left = None 154 155 def Copy(self): 156 return DeviceInfo(None, self.serial_number, self.location, 157 self.provision_status) 158 159 def Reboot(self): 160 return self._fastboot_device_controller.Reboot() 161 162 def Oem(self, oem_command, err_to_out=False): 163 return self._fastboot_device_controller.Oem(oem_command, err_to_out) 164 165 def Flash(self, partition, file_path): 166 return self._fastboot_device_controller.Flash(partition, file_path) 167 168 def Upload(self, file_path): 169 return self._fastboot_device_controller.Upload(file_path) 170 171 def Download(self, file_path): 172 return self._fastboot_device_controller.Download(file_path) 173 174 def GetVar(self, var): 175 return self._fastboot_device_controller.GetVar(var) 176 177 def __eq__(self, other): 178 return (self.serial_number == other.serial_number and 179 self.location == other.location and 180 self.provision_status == other.provision_status) 181 182 def __ne__(self, other): 183 return not self.__eq__(other) 184 185 def __str__(self): 186 if self.location: 187 return self.serial_number + ' at location: ' + self.location 188 else: 189 return self.serial_number 190 191 192class RebootCallback(object): 193 """The class to handle reboot success and timeout callbacks.""" 194 195 def __init__( 196 self, timeout, success_callback, timeout_callback): 197 """Initiate a reboot callback handler class. 198 199 Args: 200 timeout: How much time to wait for the device to reappear. 201 success_callback: The callback to be called if the device reappear 202 before timeout. 203 timeout_callback: The callback to be called if the device doesn't reappear 204 before timeout. 205 """ 206 self.success = success_callback 207 self.fail = timeout_callback 208 # Lock to make sure only one callback is called. (either success or timeout) 209 # This lock can only be obtained once. 210 self.lock = threading.Lock() 211 self.timer = threading.Timer(timeout, self._TimeoutCallback) 212 self.timer.start() 213 214 def _TimeoutCallback(self): 215 """The function to handle timeout callback. 216 217 Call the timeout_callback that is registered. 218 """ 219 if self.lock and self.lock.acquire(False): 220 self.fail() 221 222 def Release(self): 223 lock = self.lock 224 timer = self.timer 225 self.lock = None 226 self.timer = None 227 lock.release() 228 timer.cancel() 229 230 231class AtftManager(object): 232 """The manager to implement ATFA tasks. 233 234 Attributes: 235 atfa_dev: A FastbootDevice object identifying the detected ATFA device. 236 target_dev: A FastbootDevice object identifying the AT device 237 to be provisioned. 238 """ 239 SORT_BY_SERIAL = 0 240 SORT_BY_LOCATION = 1 241 # The length of the permanent attribute should be 1052. 242 EXPECTED_ATTRIBUTE_LENGTH = 1052 243 244 # The Permanent Attribute File JSON Key Names: 245 JSON_PRODUCT_NAME = 'productName' 246 JSON_PRODUCT_ATTRIBUTE = 'productPermanentAttribute' 247 JSON_PRODUCT_ATTRIBUTE = 'productPermanentAttribute' 248 JSON_VBOOT_KEY = 'bootloaderPublicKey' 249 250 def __init__(self, fastboot_device_controller, serial_mapper, configs): 251 """Initialize attributes and store the supplied fastboot_device_controller. 252 253 Args: 254 fastboot_device_controller: 255 The interface to interact with a fastboot device. 256 serial_mapper: 257 The interface to get the USB physical location to serial number map. 258 configs: 259 The additional configurations. Need to contain 'ATFA_REBOOT_TIMEOUT'. 260 """ 261 # The timeout period for ATFA device reboot. 262 self.ATFA_REBOOT_TIMEOUT = 30 263 if configs and 'ATFA_REBOOT_TIMEOUT' in configs: 264 try: 265 self.ATFA_REBOOT_TIMEOUT = float(configs['ATFA_REBOOT_TIMEOUT']) 266 except ValueError: 267 pass 268 269 # The serial numbers for the devices that are at least seen twice. 270 self.stable_serials = [] 271 # The serail numbers for the devices that are only seen once. 272 self.pending_serials = [] 273 # The atfa device DeviceInfo object. 274 self.atfa_dev = None 275 # The atfa device currently rebooting to set the os. 276 self._atfa_dev_setting = None 277 # The list of target devices DeviceInfo objects. 278 self.target_devs = [] 279 # The product information for the selected product. 280 self.product_info = None 281 # The atfa device manager. 282 self._atfa_dev_manager = AtfaDeviceManager(self) 283 # The fastboot controller. 284 self._fastboot_device_controller = fastboot_device_controller 285 # The map mapping serial number to USB location. 286 self._serial_mapper = serial_mapper() 287 # The map mapping rebooting device serial number to their reboot callback 288 # objects. 289 self._reboot_callbacks = {} 290 291 self._atfa_reboot_lock = threading.Lock() 292 293 def GetATFAKeysLeft(self): 294 if not self.atfa_dev: 295 return None 296 return self.atfa_dev.keys_left 297 298 def CheckATFAStatus(self): 299 return self._atfa_dev_manager.CheckStatus() 300 301 def SwitchATFAStorage(self): 302 if self._fastboot_device_controller.GetHostOs() == 'Windows': 303 # Only windows need to switch. For Linux the partition should already 304 # mounted. 305 self._atfa_dev_manager.SwitchStorage() 306 else: 307 self.CheckDevice(self.atfa_dev) 308 309 def RebootATFA(self): 310 return self._atfa_dev_manager.Reboot() 311 312 def ShutdownATFA(self): 313 return self._atfa_dev_manager.Shutdown() 314 315 def ProcessATFAKey(self): 316 return self._atfa_dev_manager.ProcessKey() 317 318 def ListDevices(self, sort_by=SORT_BY_LOCATION): 319 """Get device list. 320 321 Get the serial number of the ATFA device and the target device. If the 322 device does not exist, the returned serial number would be None. 323 324 Args: 325 sort_by: The field to sort by. 326 """ 327 # ListDevices returns a list of USBHandles 328 device_serials = self._fastboot_device_controller.ListDevices() 329 self.UpdateDevices(device_serials) 330 self._HandleRebootCallbacks() 331 self._SortTargetDevices(sort_by) 332 333 def UpdateDevices(self, device_serials): 334 """Update device list. 335 336 Args: 337 device_serials: The device serial numbers. 338 """ 339 self._UpdateSerials(device_serials) 340 if not self.stable_serials: 341 self.target_devs = [] 342 self.atfa_dev = None 343 return 344 self._HandleSerials() 345 346 @staticmethod 347 def _SerialAsKey(device): 348 return device.serial_number 349 350 @staticmethod 351 def _LocationAsKey(device): 352 if device.location is None: 353 return '' 354 return device.location 355 356 def _SortTargetDevices(self, sort_by): 357 """Sort the target device list according to sort_by field. 358 359 Args: 360 sort_by: The field to sort by, possible values are: 361 self.SORT_BY_LOCATION and self.SORT_BY_SERIAL. 362 """ 363 if sort_by == self.SORT_BY_LOCATION: 364 self.target_devs.sort(key=AtftManager._LocationAsKey) 365 elif sort_by == self.SORT_BY_SERIAL: 366 self.target_devs.sort(key=AtftManager._SerialAsKey) 367 368 def _UpdateSerials(self, device_serials): 369 """Update the stored pending_serials and stable_serials. 370 371 Note that we cannot check status once the fastboot device is found since the 372 device may not be ready yet. So we put the new devices into the pending 373 state. Once we see the device again in the next refresh, we add that device. 374 If that device is not seen in the next refresh, we remove it from pending. 375 This makes sure that the device would have a refresh interval time after 376 it's recognized as a fastboot device until it's issued command. 377 378 Args: 379 device_serials: The list of serial numbers of the fastboot devices. 380 """ 381 stable_serials_copy = self.stable_serials[:] 382 pending_serials_copy = self.pending_serials[:] 383 self.stable_serials = [] 384 self.pending_serials = [] 385 for serial in device_serials: 386 if serial in stable_serials_copy or serial in pending_serials_copy: 387 # Was in stable or pending state, seen twice, add to stable state. 388 self.stable_serials.append(serial) 389 else: 390 # First seen, add to pending state. 391 self.pending_serials.append(serial) 392 393 def _CheckAtfaSetOs(self): 394 """Check whether the ATFA device reappear after a 'set-os' command. 395 396 If it reappears, we create a new ATFA device object. 397 If not, something wrong happens, we need to clean the rebooting state. 398 """ 399 atfa_serial = self._atfa_dev_setting.serial_number 400 if atfa_serial in self.stable_serials: 401 # We found the ATFA device again. 402 self._serial_mapper.refresh_serial_map() 403 controller = self._fastboot_device_controller(atfa_serial) 404 location = self._serial_mapper.get_location(atfa_serial) 405 self.atfa_dev = DeviceInfo(controller, atfa_serial, location) 406 407 # Clean the state 408 self._atfa_dev_setting = None 409 self._atfa_reboot_lock.release() 410 411 def _HandleSerials(self): 412 """Create new devices and remove old devices. 413 414 Add device location information and target device provision status. 415 """ 416 device_serials = self.stable_serials 417 new_targets = [] 418 atfa_serial = None 419 for serial in device_serials: 420 if not serial: 421 continue 422 423 if serial.startswith('ATFA'): 424 atfa_serial = serial 425 else: 426 new_targets.append(serial) 427 428 if atfa_serial is None: 429 # No ATFA device found. 430 self.atfa_dev = None 431 elif self.atfa_dev is None or self.atfa_dev.serial_number != atfa_serial: 432 self._AddNewAtfa(atfa_serial) 433 434 # Remove those devices that are not in new targets and not rebooting. 435 self.target_devs = [ 436 device for device in self.target_devs 437 if (device.serial_number in new_targets or 438 device.provision_status == ProvisionStatus.REBOOT_ING) 439 ] 440 441 common_serials = [device.serial_number for device in self.target_devs] 442 443 # Create new device object for newly added devices. 444 self._serial_mapper.refresh_serial_map() 445 for serial in new_targets: 446 if serial not in common_serials: 447 self._CreateNewTargetDevice(serial) 448 449 def _CreateNewTargetDevice(self, serial, check_status=True): 450 """Create a new target device object. 451 452 Args: 453 serial: The serial number for the new target device. 454 check_status: Whether to check provision status for the target device. 455 """ 456 try: 457 controller = self._fastboot_device_controller(serial) 458 location = self._serial_mapper.get_location(serial) 459 460 new_target_dev = DeviceInfo(controller, serial, location) 461 if check_status: 462 self.CheckProvisionStatus(new_target_dev) 463 self.target_devs.append(new_target_dev) 464 except FastbootFailure as e: 465 e.msg = ('Error while creating new device: ' + str(new_target_dev) + 466 '\n'+ e.msg) 467 self.stable_serials.remove(serial) 468 raise e 469 470 def _AddNewAtfa(self, atfa_serial): 471 """Create a new ATFA device object. 472 473 If the OS variable on the ATFA device is not the same as the host OS 474 version, we would use set the correct OS version. 475 476 Args: 477 atfa_serial: The serial number of the ATFA device to be added. 478 """ 479 self._serial_mapper.refresh_serial_map() 480 controller = self._fastboot_device_controller(atfa_serial) 481 location = self._serial_mapper.get_location(atfa_serial) 482 if self._atfa_reboot_lock.acquire(False): 483 # If there's not an atfa setting os already happening 484 self._atfa_dev_setting = DeviceInfo(controller, atfa_serial, location) 485 try: 486 atfa_os = self._GetOs(self._atfa_dev_setting) 487 except FastbootFailure: 488 # The device is not ready for get OS command, we just ignore the device. 489 self._atfa_reboot_lock.release() 490 return 491 host_os = controller.GetHostOs() 492 if atfa_os == host_os: 493 # The OS set for the ATFA is correct, we just create the new device. 494 self.atfa_dev = self._atfa_dev_setting 495 self._atfa_dev_setting = None 496 self._atfa_reboot_lock.release() 497 else: 498 # The OS set for the ATFA is not correct, need to set it. 499 try: 500 self._SetOs(self._atfa_dev_setting, host_os) 501 # SetOs include a rebooting process, but the device would not 502 # disappear from the device list immediately after the command. 503 # We would check if the ATFA reappear after ATFA_REBOOT_TIMEOUT. 504 timer = threading.Timer( 505 self.ATFA_REBOOT_TIMEOUT, self._CheckAtfaSetOs) 506 timer.start() 507 except FastbootFailure: 508 self._atfa_dev_setting = None 509 self._atfa_reboot_lock.release() 510 511 def _SetOs(self, target_dev, os_version): 512 """Change the os version on the target device. 513 514 Args: 515 target_dev: The target device. 516 os_version: The os version to set, options are 'Windows' or 'Linux'. 517 Raises: 518 FastbootFailure: when fastboot command fails. 519 """ 520 target_dev.Oem('set-os ' + os_version) 521 522 def _GetOs(self, target_dev): 523 """Get the os version of the target device. 524 525 Args: 526 target_dev: The target deivce. 527 Returns: 528 The os version. 529 Raises: 530 FastbootFailure: when fastboot command fails. 531 """ 532 output = target_dev.Oem('get-os', True) 533 if output and 'Linux' in output: 534 return 'Linux' 535 else: 536 return 'Windows' 537 538 def _HandleRebootCallbacks(self): 539 """Handle the callback functions after the reboot.""" 540 success_serials = [] 541 for serial in self._reboot_callbacks: 542 if serial in self.stable_serials: 543 callback_lock = self._reboot_callbacks[serial].lock 544 # Make sure the timeout callback would not be called at the same time. 545 if callback_lock and callback_lock.acquire(False): 546 success_serials.append(serial) 547 548 for serial in success_serials: 549 self._reboot_callbacks[serial].success() 550 551 def _ParseStateString(self, state_string): 552 """Parse the string returned by 'at-vboot-state' to a key-value map. 553 554 Args: 555 state_string: The string returned by oem at-vboot-state command. 556 557 Returns: 558 A key-value map. 559 """ 560 state_map = {} 561 lines = state_string.splitlines() 562 for line in lines: 563 if line.startswith(BOOTLOADER_STRING): 564 key_value = re.split(r':[\s]*|=', line.replace(BOOTLOADER_STRING, '')) 565 if len(key_value) == 2: 566 state_map[key_value[0]] = key_value[1] 567 return state_map 568 569 def CheckProvisionStatus(self, target_dev): 570 """Check whether the target device has been provisioned. 571 572 Args: 573 target_dev: The target device (DeviceInfo). 574 """ 575 at_attest_uuid = target_dev.GetVar('at-attest-uuid') 576 state_string = target_dev.GetVar('at-vboot-state') 577 578 target_dev.provision_status = ProvisionStatus.IDLE 579 target_dev.provision_state = ProvisionState() 580 581 status_set = False 582 583 # TODO(shanyu): We only need empty string here 584 # NOT_PROVISIONED is for test purpose. 585 if at_attest_uuid and at_attest_uuid != 'NOT_PROVISIONED': 586 target_dev.provision_status = ProvisionStatus.PROVISION_SUCCESS 587 status_set = True 588 target_dev.provision_state.provisioned = True 589 590 # state_string should be in format: 591 # (bootloader) bootloader-locked: 1 592 # (bootloader) bootloader-min-versions: -1,0,3 593 # (bootloader) avb-perm-attr-set: 1 594 # (bootloader) avb-locked: 0 595 # (bootloader) avb-unlock-disabled: 0 596 # (bootloader) avb-min-versions: 0:1,1:1,2:1,4097 :2,4098:2 597 if not state_string: 598 return 599 state_map = self._ParseStateString(state_string) 600 if state_map.get('avb-locked') and state_map['avb-locked'] == '1': 601 if not status_set: 602 target_dev.provision_status = ProvisionStatus.LOCKAVB_SUCCESS 603 status_set = True 604 target_dev.provision_state.avb_locked = True 605 606 if (state_map.get('avb-perm-attr-set') and 607 state_map['avb-perm-attr-set'] == '1'): 608 if not status_set: 609 target_dev.provision_status = ProvisionStatus.FUSEATTR_SUCCESS 610 status_set = True 611 target_dev.provision_state.avb_perm_attr_set = True 612 613 if (state_map.get('bootloader-locked') and 614 state_map['bootloader-locked'] == '1'): 615 if not status_set: 616 target_dev.provision_status = ProvisionStatus.FUSEVBOOT_SUCCESS 617 target_dev.provision_state.bootloader_locked = True 618 619 620 def TransferContent(self, src, dst): 621 """Transfer content from a device to another device. 622 623 Download file from one device and store it into a tmp file. Upload file from 624 the tmp file onto another device. 625 626 Args: 627 src: The source device to be copied from. 628 dst: The destination device to be copied to. 629 """ 630 # create a tmp folder 631 tmp_folder = tempfile.mkdtemp() 632 # temperate file name is a UUID based on host ID and current time. 633 tmp_file_name = str(uuid.uuid1()) 634 file_path = os.path.join(tmp_folder, tmp_file_name) 635 # pull file to local fs 636 src.Upload(file_path) 637 # push file to fastboot device 638 dst.Download(file_path) 639 # delete the temperate file afterwards 640 if os.path.exists(file_path): 641 os.remove(file_path) 642 # delete the temperate folder afterwards 643 if os.path.exists(tmp_folder): 644 os.rmdir(tmp_folder) 645 646 def GetTargetDevice(self, serial): 647 """Get the target DeviceInfo object according to the serial number. 648 649 Args: 650 serial: The serial number for the device object. 651 Returns: 652 The DeviceInfo object for the device. None if not exists. 653 """ 654 for device in self.target_devs: 655 if device.serial_number == serial: 656 return device 657 658 return None 659 660 def Provision(self, target): 661 """Provision the key to the target device. 662 663 1. Get supported encryption algorithm 664 2. Send atfa-start-provisioning message to ATFA 665 3. Transfer content from ATFA to target 666 4. Send at-get-ca-request to target 667 5. Transfer content from target to ATFA 668 6. Send atfa-finish-provisioning message to ATFA 669 7. Transfer content from ATFA to target 670 8. Send at-set-ca-response message to target 671 672 Args: 673 target: The target device to be provisioned to. 674 """ 675 try: 676 target.provision_status = ProvisionStatus.PROVISION_ING 677 atfa = self.atfa_dev 678 AtftManager.CheckDevice(atfa) 679 # Set the ATFA's time first. 680 self._atfa_dev_manager.SetTime() 681 algorithm_list = self._GetAlgorithmList(target) 682 algorithm = self._ChooseAlgorithm(algorithm_list) 683 # First half of the DH key exchange 684 atfa.Oem('atfa-start-provisioning ' + str(algorithm)) 685 self.TransferContent(atfa, target) 686 # Second half of the DH key exchange 687 target.Oem('at-get-ca-request') 688 self.TransferContent(target, atfa) 689 # Encrypt and transfer key bundle 690 atfa.Oem('atfa-finish-provisioning') 691 self.TransferContent(atfa, target) 692 # Provision the key on device 693 target.Oem('at-set-ca-response') 694 695 # After a success provision, the status should be updated. 696 self.CheckProvisionStatus(target) 697 if not target.provision_state.provisioned: 698 raise FastbootFailure('Status not updated.') 699 except (FastbootFailure, DeviceNotFoundException) as e: 700 target.provision_status = ProvisionStatus.PROVISION_FAILED 701 raise e 702 703 def FuseVbootKey(self, target): 704 """Fuse the verified boot key to the target device. 705 706 Args: 707 target: The target device. 708 """ 709 if not self.product_info: 710 target.provision_status = ProvisionStatus.FUSEVBOOT_FAILED 711 raise ProductNotSpecifiedException 712 713 # Create a temporary file to store the vboot key. 714 target.provision_status = ProvisionStatus.FUSEVBOOT_ING 715 try: 716 temp_file = tempfile.NamedTemporaryFile(delete=False) 717 temp_file.write(self.product_info.vboot_key) 718 temp_file.close() 719 temp_file_name = temp_file.name 720 target.Download(temp_file_name) 721 # Delete the temporary file. 722 os.remove(temp_file_name) 723 target.Oem('fuse at-bootloader-vboot-key') 724 725 # After a success fuse, the status should be updated. 726 self.CheckProvisionStatus(target) 727 if not target.provision_state.bootloader_locked: 728 raise FastbootFailure('Status not updated.') 729 730 # # Another possible flow: 731 # target.Flash('sec', temp_file_name) 732 # os.remove(temp_file_name) 733 except FastbootFailure as e: 734 target.provision_status = ProvisionStatus.FUSEVBOOT_FAILED 735 raise e 736 737 def FusePermAttr(self, target): 738 """Fuse the permanent attributes to the target device. 739 740 Args: 741 target: The target device. 742 """ 743 if not self.product_info: 744 target.provision_status = ProvisionStatus.FUSEATTR_FAILED 745 raise ProductNotSpecifiedException 746 try: 747 target.provision_status = ProvisionStatus.FUSEATTR_ING 748 temp_file = tempfile.NamedTemporaryFile(delete=False) 749 temp_file.write(self.product_info.product_attributes) 750 temp_file.close() 751 temp_file_name = temp_file.name 752 target.Download(temp_file_name) 753 os.remove(temp_file_name) 754 target.Oem('fuse at-perm-attr') 755 756 self.CheckProvisionStatus(target) 757 if not target.provision_state.avb_perm_attr_set: 758 raise FastbootFailure('Status not updated') 759 760 except FastbootFailure as e: 761 target.provision_status = ProvisionStatus.FUSEATTR_FAILED 762 raise e 763 764 def LockAvb(self, target): 765 """Lock the android verified boot for the target. 766 767 Args: 768 target: The target device. 769 """ 770 try: 771 target.provision_status = ProvisionStatus.LOCKAVB_ING 772 target.Oem('at-lock-vboot') 773 self.CheckProvisionStatus(target) 774 if not target.provision_state.avb_locked: 775 raise FastbootFailure('Status not updated') 776 except FastbootFailure as e: 777 target.provision_status = ProvisionStatus.LOCKAVB_FAILED 778 raise e 779 780 def Reboot(self, target, timeout, success_callback, timeout_callback): 781 """Reboot the target device. 782 783 Args: 784 target: The target device. 785 timeout: The time out value. 786 success_callback: The callback function called when the device reboots 787 successfully. 788 timeout_callback: The callback function called when the device reboots 789 timeout. 790 791 The device would disappear from the list after reboot. 792 If we see the device again within timeout, call the success_callback, 793 otherwise call the timeout_callback. 794 """ 795 try: 796 target.Reboot() 797 serial = target.serial_number 798 location = target.location 799 # We assume after the reboot the device would disappear 800 self.target_devs.remove(target) 801 del target 802 self.stable_serials.remove(serial) 803 # Create a rebooting target device that only contains serial and location. 804 rebooting_target = DeviceInfo(None, serial, location) 805 rebooting_target.provision_status = ProvisionStatus.REBOOT_ING 806 self.target_devs.append(rebooting_target) 807 808 reboot_callback = RebootCallback( 809 timeout, 810 self.RebootCallbackWrapper(success_callback, serial, True), 811 self.RebootCallbackWrapper(timeout_callback, serial, False)) 812 self._reboot_callbacks[serial] = reboot_callback 813 814 except FastbootFailure as e: 815 target.provision_status = ProvisionStatus.REBOOT_FAILED 816 raise e 817 818 def RebootCallbackWrapper(self, callback, serial, success): 819 """This wrapper function wraps the original callback function. 820 821 Some clean up operations are added. We need to remove the handler if 822 callback is called. We need to release the resource the handler requires. 823 We also needs to remove the rebooting device from the target list since a 824 new device would be created if the device reboot successfully. 825 826 Args: 827 callback: The original callback function. 828 serial: The serial number for the device. 829 success: Whether this is the success callback. 830 Returns: 831 An extended callback function. 832 """ 833 def RebootCallbackFunc(callback=callback, serial=serial, success=success): 834 try: 835 rebooting_dev = self.GetTargetDevice(serial) 836 if rebooting_dev: 837 self.target_devs.remove(rebooting_dev) 838 del rebooting_dev 839 if success: 840 self._serial_mapper.refresh_serial_map() 841 self._CreateNewTargetDevice(serial, True) 842 self.GetTargetDevice(serial).provision_status = ( 843 ProvisionStatus.REBOOT_SUCCESS) 844 callback() 845 self._reboot_callbacks[serial].Release() 846 del self._reboot_callbacks[serial] 847 except FastbootFailure as e: 848 # Release the lock so that it can be obtained again. 849 self._reboot_callbacks[serial].lock.release() 850 raise e 851 852 853 854 return RebootCallbackFunc 855 856 def _GetAlgorithmList(self, target): 857 """Get the supported algorithm list. 858 859 Get the available algorithm list using getvar at-attest-dh 860 at_attest_dh should be in format 1:p256,2:curve25519 861 or 1:p256 862 or 2:curve25519. 863 864 Args: 865 target: The target device to check for supported algorithm. 866 Returns: 867 A list of available algorithms. 868 Options are ALGORITHM_P256 or ALGORITHM_CURVE25519 869 """ 870 at_attest_dh = target.GetVar('at-attest-dh') 871 algorithm_strings = at_attest_dh.split(',') 872 algorithm_list = [] 873 for algorithm_string in algorithm_strings: 874 algorithm_list.append(int(algorithm_string.split(':')[0])) 875 return algorithm_list 876 877 def _ChooseAlgorithm(self, algorithm_list): 878 """Choose the encryption algorithm to use for key provisioning. 879 880 We favor ALGORITHM_CURVE25519 over ALGORITHM_P256 881 882 Args: 883 algorithm_list: The list containing all available algorithms. 884 Returns: 885 The selected available algorithm 886 Raises: 887 NoAlgorithmAvailableException: 888 When there's no available valid algorithm to use. 889 """ 890 if not algorithm_list: 891 raise NoAlgorithmAvailableException() 892 if EncryptionAlgorithm.ALGORITHM_CURVE25519 in algorithm_list: 893 return EncryptionAlgorithm.ALGORITHM_CURVE25519 894 elif EncryptionAlgorithm.ALGORITHM_P256 in algorithm_list: 895 return EncryptionAlgorithm.ALGORITHM_P256 896 897 raise NoAlgorithmAvailableException() 898 899 def ProcessProductAttributesFile(self, content): 900 """Process the product attributes file. 901 902 The file should follow the following JSON format: 903 { 904 "productName": "", 905 "productDescription": "", 906 "productConsoleId": "", 907 "productPermanentAttribute": "", 908 "bootloaderPublicKey": "", 909 "creationTime": "" 910 } 911 912 Args: 913 content: The content of the product attributes file. 914 Raises: 915 ProductAttributesFileFormatError: When the file format is wrong. 916 """ 917 try: 918 file_object = json.loads(content) 919 except ValueError: 920 raise ProductAttributesFileFormatError( 921 'Wrong JSON format!') 922 product_name = file_object.get(self.JSON_PRODUCT_NAME) 923 attribute_string = file_object.get(self.JSON_PRODUCT_ATTRIBUTE) 924 vboot_key_string = file_object.get(self.JSON_VBOOT_KEY) 925 if not product_name or not attribute_string or not vboot_key_string: 926 raise ProductAttributesFileFormatError( 927 'Essential field missing!') 928 try: 929 attribute = base64.standard_b64decode(attribute_string) 930 attribute_array = bytearray(attribute) 931 if self.EXPECTED_ATTRIBUTE_LENGTH != len(attribute_array): 932 raise ProductAttributesFileFormatError( 933 'Incorrect permanent product attributes length') 934 935 # We only need the last 16 byte for product ID 936 # We store the hex representation of the product ID 937 product_id = self._ByteToHex(attribute_array[-16:]) 938 939 vboot_key_array = bytearray(base64.standard_b64decode(vboot_key_string)) 940 941 except TypeError: 942 raise ProductAttributesFileFormatError( 943 'Incorrect Base64 encoding for permanent product attributes') 944 945 self.product_info = ProductInfo(product_id, product_name, attribute_array, 946 vboot_key_array) 947 948 def _ByteToHex(self, byte_array): 949 """Transform a byte array into a hex string.""" 950 return ''.join('{:02x}'.format(x) for x in byte_array) 951 952 @staticmethod 953 def CheckDevice(device): 954 """Check if the device is a connected fastboot device. 955 956 Args: 957 device: The device to be checked. 958 Raises: 959 DeviceNotFoundException: When the device is not found 960 """ 961 if device is None: 962 raise DeviceNotFoundException() 963 964 965class AtfaDeviceManager(object): 966 """The class to manager ATFA device related operations.""" 967 968 def __init__(self, atft_manager): 969 """Initiate the atfa device manager using the at-factory-tool manager. 970 971 Args: 972 atft_manager: The at-factory-tool manager that 973 includes this atfa device manager. 974 """ 975 self.atft_manager = atft_manager 976 977 def GetSerial(self): 978 """Issue fastboot command to get serial number for the ATFA device. 979 980 Raises: 981 DeviceNotFoundException: When the device is not found. 982 """ 983 AtftManager.CheckDevice(self.atft_manager.atfa_dev) 984 self.atft_manager.atfa_dev.Oem('serial') 985 986 def SwitchStorage(self): 987 """Switch the ATFA device to storage mode. 988 989 Raises: 990 DeviceNotFoundException: When the device is not found 991 """ 992 AtftManager.CheckDevice(self.atft_manager.atfa_dev) 993 self.atft_manager.atfa_dev.Oem('storage') 994 995 def ProcessKey(self): 996 """Ask the ATFA device to process the stored key bundle. 997 998 Raises: 999 DeviceNotFoundException: When the device is not found 1000 """ 1001 # Need to set time first so that certificates would validate. 1002 self.SetTime() 1003 AtftManager.CheckDevice(self.atft_manager.atfa_dev) 1004 self.atft_manager.atfa_dev.Oem('process-keybundle') 1005 1006 def Reboot(self): 1007 """Reboot the ATFA device. 1008 1009 Raises: 1010 DeviceNotFoundException: When the device is not found 1011 """ 1012 AtftManager.CheckDevice(self.atft_manager.atfa_dev) 1013 self.atft_manager.atfa_dev.Oem('reboot') 1014 1015 def Shutdown(self): 1016 """Shutdown the ATFA device. 1017 1018 Raises: 1019 DeviceNotFoundException: When the device is not found 1020 """ 1021 AtftManager.CheckDevice(self.atft_manager.atfa_dev) 1022 self.atft_manager.atfa_dev.Oem('shutdown') 1023 1024 def CheckStatus(self): 1025 """Update the number of available AT keys for the current product. 1026 1027 Need to use GetKeysLeft() function to get the number of keys left. If some 1028 error happens, keys_left would be set to -1 to prevent checking again. 1029 1030 Raises: 1031 FastbootFailure: If error happens with the fastboot oem command. 1032 """ 1033 1034 if not self.atft_manager.product_info: 1035 raise ProductNotSpecifiedException() 1036 1037 AtftManager.CheckDevice(self.atft_manager.atfa_dev) 1038 # -1 means some error happens. 1039 self.atft_manager.atfa_dev.keys_left = -1 1040 out = self.atft_manager.atfa_dev.Oem( 1041 'num-keys ' + self.atft_manager.product_info.product_id, True) 1042 # Note: use splitlines instead of split('\n') to prevent '\r\n' problem on 1043 # windows. 1044 for line in out.splitlines(): 1045 if line.startswith('(bootloader) '): 1046 try: 1047 self.atft_manager.atfa_dev.keys_left = int( 1048 line.replace('(bootloader) ', '')) 1049 return 1050 except ValueError: 1051 raise FastbootFailure( 1052 'ATFA device response has invalid format') 1053 1054 raise FastbootFailure('ATFA device response has invalid format') 1055 1056 def SetTime(self): 1057 """Inject the host time into the ATFA device. 1058 1059 Raises: 1060 DeviceNotFoundException: When the device is not found 1061 """ 1062 AtftManager.CheckDevice(self.atft_manager.atfa_dev) 1063 time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') 1064 self.atft_manager.atfa_dev.Oem('set-date ' + time) 1065