1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Code to provide functions for FAFT tests. 5 6These will be exposed via an xmlrpc server running on the DUT. 7 8@note: When adding categories, please also update server/cros/faft/rpc_proxy.pyi 9""" 10import httplib 11import os 12import sys 13import tempfile 14import traceback 15import xmlrpclib 16 17from autotest_lib.client.common_lib.cros import cros_config 18from autotest_lib.client.cros.faft.utils import ( 19 cgpt_handler, 20 os_interface, 21 firmware_check_keys, 22 firmware_updater, 23 flashrom_handler, 24 kernel_handler, 25 rootfs_handler, 26 tpm_handler, 27) 28 29 30class RPCRouter(object): 31 """ 32 A class which routes RPC methods to the proper servicers. 33 34 Firmware tests are able to call an RPC method via: 35 FAFTClient.[category].[method_name](params) 36 When XML-RPC is being used, the RPC server routes the called method to: 37 RPCHandler._dispatch('[category].[method]', params) 38 The method is then dispatched to a Servicer class. 39 """ 40 41 def __init__(self, os_if): 42 """Initialize the servicer for each category. 43 44 @type os_if: os_interface.OSInterface 45 """ 46 self.bios = BiosServicer(os_if) 47 self.cgpt = CgptServicer(os_if) 48 self.ec = EcServicer(os_if) 49 self.kernel = KernelServicer(os_if) 50 self.rootfs = RootfsServicer(os_if) 51 self.rpc_settings = RpcSettingsServicer(os_if) 52 self.system = SystemServicer(os_if) 53 self.tpm = TpmServicer(os_if) 54 self.updater = UpdaterServicer(os_if) 55 56 self._rpc_servicers = { 57 'bios': self.bios, 58 'cgpt': self.cgpt, 59 'ec': self.ec, 60 'kernel': self.kernel, 61 'rpc_settings': self.rpc_settings, 62 'rootfs': self.rootfs, 63 'system': self.system, 64 'tpm': self.tpm, 65 'updater': self.updater 66 } 67 68 self._os_if = os_if 69 70 def _report_error(self, fault_code, message, exc_info=None): 71 """Raise the given RPC error text, including information about last 72 exception from sys.exc_info(). The log file gets the traceback in text; 73 the raised exception keeps the old traceback (but not in text). 74 75 Note: this must be called right after the original exception, or it may 76 report the wrong exception. 77 78 @raise: xmlrpclib.Fault 79 80 @param fault_code: the status code to use 81 @param message: the string message to include before exception text 82 @param exc_info: the tuple from sys.exc_info() 83 @return the exception to raise 84 85 @type fault_code: int 86 @type message: str 87 @type exc_info: bool 88 @rtype: Exception 89 """ 90 if exc_info: 91 tb = None 92 try: 93 (exc_class, exc, tb) = sys.exc_info() 94 95 tb_str = ''.join( 96 traceback.format_exception(exc_class, exc, tb)) 97 self._os_if.log('Error: %s.\n%s' % (message, tb_str.rstrip())) 98 99 exc_str = ''.join( 100 traceback.format_exception_only(exc_class, exc)) 101 exc = xmlrpclib.Fault( 102 fault_code, '%s. %s' % (message, exc_str.rstrip())) 103 raise exc, None, tb 104 finally: 105 del exc_info 106 del tb 107 else: 108 self._os_if.log('Error: %s' % message) 109 return xmlrpclib.Fault(fault_code, message) 110 111 def _dispatch(self, called_method, params): 112 """ 113 Send any RPC call to the appropriate servicer method. 114 115 @param called_method: The method of FAFTClient that was called. 116 Should take the form 'category.method'. 117 @param params: The arguments passed into the method. 118 119 @type called_method: str 120 @type params: tuple 121 122 @raise: xmlrpclib.Fault (using http error codes for fault codes) 123 """ 124 self._os_if.log('Called: %s%s' % (called_method, params)) 125 126 name_pieces = called_method.split('.') 127 num_pieces = len(name_pieces) 128 129 if num_pieces < 1: 130 raise self._report_error( 131 httplib.BAD_REQUEST, 132 'RPC request is invalid (completely empty): "%s"' % 133 called_method) 134 135 if num_pieces < 2: 136 # must be category.func (maybe with .__str__) 137 raise self._report_error( 138 httplib.BAD_REQUEST, 139 'RPC request is invalid (must have category.method format):' 140 ' "%s"' % called_method) 141 142 method_name = name_pieces.pop() 143 category = '.'.join(name_pieces) 144 145 if (method_name.startswith('_') 146 and method_name not in ('__str__', '__repr__', '__call__')): 147 # *._private() or *.__special__() 148 # Forbid early, to prevent seeing which methods exist. 149 raise self._report_error( 150 httplib.FORBIDDEN, 151 'RPC method name is private: %s.[%s]' % 152 (category, method_name)) 153 154 if not method_name: 155 # anything.() 156 raise self._report_error( 157 httplib.BAD_REQUEST, 158 'RPC method name is empty: %s.[%s]' % 159 (category, method_name)) 160 161 if category in self._rpc_servicers: 162 # system.func() 163 holder = self._rpc_servicers[category] 164 if not hasattr(holder, method_name): 165 raise self._report_error( 166 httplib.NOT_FOUND, 167 'RPC method not found: %s.[%s]' % 168 (category, method_name)) 169 170 elif category: 171 # invalid.func() 172 raise self._report_error( 173 httplib.NOT_FOUND, 174 'RPC category not found: [%s].%s' % 175 (category, method_name)) 176 177 else: 178 # .invalid() 179 raise self._report_error( 180 httplib.BAD_REQUEST, 181 'RPC request is invalid (empty category name): [%s].%s' % 182 (category, method_name)) 183 184 try: 185 method = getattr(holder, method_name) 186 187 except AttributeError: 188 raise self._report_error( 189 httplib.NOT_IMPLEMENTED, 190 'RPC method not found: "%s"' % called_method, exc_info=True) 191 192 try: 193 return method(*params) 194 195 except Exception: 196 raise self._report_error( 197 httplib.INTERNAL_SERVER_ERROR, 198 'RPC call failed: %s()' % called_method, exc_info=True) 199 200 201class BiosServicer(object): 202 """Class to service all BIOS RPCs""" 203 204 def __init__(self, os_if): 205 """ 206 @type os_if: os_interface.OSInterface 207 """ 208 self._os_if = os_if 209 210 # This attribute is accessed via a property, so it can load lazily 211 # when actually used by the test. 212 self._real_bios_handler = flashrom_handler.FlashromHandler( 213 self._os_if, None, '/usr/share/vboot/devkeys', 'bios') 214 215 @property 216 def _bios_handler(self): 217 """Return the BIOS flashrom handler, after initializing it if necessary 218 219 @rtype: flashrom_handler.FlashromHandler 220 """ 221 if not self._real_bios_handler.initialized: 222 self._real_bios_handler.init() 223 return self._real_bios_handler 224 225 def reload(self): 226 """Reload the firmware image that may be changed.""" 227 self._bios_handler.new_image() 228 229 def get_gbb_flags(self): 230 """Get the GBB flags. 231 232 @return: An integer of the GBB flags. 233 """ 234 return self._bios_handler.get_gbb_flags() 235 236 def set_gbb_flags(self, flags): 237 """Set the GBB flags. 238 239 @param flags: An integer of the GBB flags. 240 """ 241 self._bios_handler.set_gbb_flags(flags, write_through=True) 242 243 def get_preamble_flags(self, section): 244 """Get the preamble flags of a firmware section. 245 246 @param section: A firmware section, either 'a' or 'b'. 247 @return: An integer of the preamble flags. 248 """ 249 return self._bios_handler.get_section_flags(section) 250 251 def set_preamble_flags(self, section, flags): 252 """Set the preamble flags of a firmware section. 253 254 @param section: A firmware section, either 'a' or 'b'. 255 @param flags: An integer of preamble flags. 256 """ 257 version = self.get_version(section) 258 self._bios_handler.set_section_version( 259 section, version, flags, write_through=True) 260 261 def get_body_sha(self, section): 262 """Get SHA1 hash of BIOS RW firmware section. 263 264 @param section: A firmware section, either 'a' or 'b'. 265 @return: A string of the body SHA1 hash. 266 """ 267 return self._bios_handler.get_section_sha(section) 268 269 def get_sig_sha(self, section): 270 """Get SHA1 hash of firmware vblock in section. 271 272 @param section: A firmware section, either 'a' or 'b'. 273 @return: A string of the sig SHA1 hash. 274 """ 275 return self._bios_handler.get_section_sig_sha(section) 276 277 def get_section_fwid(self, section=None): 278 """Retrieve the RO or RW fwid. 279 280 @param section: A firmware section, either 'a' or 'b'. 281 @return: A string of the fwid 282 """ 283 return self._bios_handler.get_section_fwid(section) 284 285 def corrupt_sig(self, section): 286 """Corrupt the requested firmware section signature. 287 288 @param section: A firmware section, either 'a' or 'b'. 289 """ 290 self._bios_handler.corrupt_firmware(section) 291 292 def restore_sig(self, section): 293 """Restore the previously corrupted firmware section signature. 294 295 @param section: A firmware section, either 'a' or 'b'. 296 """ 297 self._bios_handler.restore_firmware(section) 298 299 def corrupt_body(self, section, corrupt_all=False): 300 """Corrupt the requested firmware section body. 301 302 @param section: A firmware section, either 'a' or 'b'. 303 @param corrupt_all (optional): Corrupt all bytes of the fw section, 304 rather than just one byte. 305 """ 306 self._bios_handler.corrupt_firmware_body(section, corrupt_all) 307 308 def restore_body(self, section): 309 """Restore the previously corrupted firmware section body. 310 311 @param section: A firmware section, either 'a' or 'b'. 312 """ 313 self._bios_handler.restore_firmware_body(section) 314 315 def _modify_version(self, section, delta): 316 """Modify firmware version for the requested section, by adding delta. 317 318 The passed in delta, a positive or a negative number, is added to the 319 original firmware version. 320 """ 321 original_version = self.get_version(section) 322 new_version = original_version + delta 323 flags = self._bios_handler.get_section_flags(section) 324 self._os_if.log('Setting firmware section %s version from %d to %d' % 325 (section, original_version, new_version)) 326 self._bios_handler.set_section_version( 327 section, new_version, flags, write_through=True) 328 329 def move_version_backward(self, section): 330 """Decrement firmware version for the requested section.""" 331 self._modify_version(section, -1) 332 333 def move_version_forward(self, section): 334 """Increase firmware version for the requested section.""" 335 self._modify_version(section, 1) 336 337 def get_version(self, section): 338 """Retrieve firmware version of a section.""" 339 return self._bios_handler.get_section_version(section) 340 341 def get_datakey_version(self, section): 342 """Return firmware data key version.""" 343 return self._bios_handler.get_section_datakey_version(section) 344 345 def get_kernel_subkey_version(self, section): 346 """Return kernel subkey version.""" 347 return self._bios_handler.get_section_kernel_subkey_version(section) 348 349 def dump_whole(self, bios_path): 350 """Dump the current BIOS firmware to a file, specified by bios_path. 351 352 @param bios_path: The path of the BIOS image to be written. 353 """ 354 self._bios_handler.dump_whole(bios_path) 355 356 def write_whole(self, bios_path): 357 """Write the firmware from bios_path to the current system. 358 359 @param bios_path: The path of the source BIOS image 360 """ 361 self._bios_handler.new_image(bios_path) 362 self._bios_handler.write_whole() 363 364 def strip_modified_fwids(self): 365 """Strip any trailing suffixes (from modify_fwids) out of the FWIDs. 366 367 @return: a dict of any fwids that were adjusted, by section (ro, a, b) 368 @rtype: dict 369 """ 370 return self._bios_handler.strip_modified_fwids() 371 372 def set_write_protect_region(self, region, enabled=None): 373 """Modify software write protect region and flag in one operation. 374 375 @param region: Region to set (usually WP_RO) 376 @param enabled: If True, run --wp-enable; if False, run --wp-disable. 377 If None (default), don't specify either one. 378 """ 379 self._bios_handler.set_write_protect_region(region, enabled) 380 381 def set_write_protect_range(self, start, length, enabled=None): 382 """Modify software write protect range and flag in one operation. 383 384 @param start: offset (bytes) from start of flash to start of range 385 @param length: offset (bytes) from start of range to end of range 386 @param enabled: If True, run --wp-enable; if False, run --wp-disable. 387 If None (default), don't specify either one. 388 """ 389 self._bios_handler.set_write_protect_range(start, length, enabled) 390 391 def get_write_protect_status(self): 392 """Get a dict describing the status of the write protection 393 394 @return: {'enabled': True/False, 'start': '0x0', 'length': '0x0', ...} 395 @rtype: dict 396 """ 397 return self._bios_handler.get_write_protect_status() 398 399 def is_available(self): 400 """Return True if available, False if not.""" 401 # Use the real handler, to avoid .init() raising an exception 402 return self._real_bios_handler.is_available() 403 404 405class CgptServicer(object): 406 """Class to service all CGPT RPCs""" 407 408 def __init__(self, os_if): 409 """ 410 @type os_if: os_interface.OSInterface 411 """ 412 self._os_if = os_if 413 self._cgpt_handler = cgpt_handler.CgptHandler(self._os_if) 414 415 def get_attributes(self): 416 """Get kernel attributes.""" 417 rootdev = self._os_if.get_root_dev() 418 self._cgpt_handler.read_device_info(rootdev) 419 return { 420 'A': self._cgpt_handler.get_partition(rootdev, 'KERN-A'), 421 'B': self._cgpt_handler.get_partition(rootdev, 'KERN-B') 422 } 423 424 def set_attributes(self, a=None, b=None): 425 """Set kernel attributes for either partition (or both).""" 426 partitions = {'A': a, 'B': b} 427 rootdev = self._os_if.get_root_dev() 428 modifiable_attributes = self._cgpt_handler.ATTR_TO_COMMAND.keys() 429 for partition_name in partitions.keys(): 430 partition = partitions[partition_name] 431 if partition is None: 432 continue 433 attributes_to_set = { 434 key: partition[key] 435 for key in modifiable_attributes 436 } 437 if attributes_to_set: 438 self._cgpt_handler.set_partition( 439 rootdev, 'KERN-%s' % partition_name, attributes_to_set) 440 441 442class EcServicer(object): 443 """Class to service all EC RPCs""" 444 445 def __init__(self, os_if): 446 """ 447 @type os_if: os_interface.OSInterface 448 """ 449 self._os_if = os_if 450 451 # This attribute is accessed via a property, so it can load lazily 452 # when actually used by the test. 453 self._real_ec_handler = None 454 ec_status = self._os_if.run_shell_command_get_status('mosys ec info') 455 if ec_status == 0: 456 self._real_ec_handler = flashrom_handler.FlashromHandler( 457 self._os_if, 'ec_root_key.vpubk', 458 '/usr/share/vboot/devkeys', 'ec') 459 460 else: 461 self._os_if.log('No EC is reported by mosys (rc=%s).' % ec_status) 462 463 @property 464 def _ec_handler(self): 465 """Return the EC flashrom handler, after initializing it if necessary 466 467 @rtype: flashrom_handler.FlashromHandler 468 """ 469 if not self._real_ec_handler: 470 # No EC handler if board has no EC 471 return None 472 473 if not self._real_ec_handler.initialized: 474 self._real_ec_handler.init() 475 return self._real_ec_handler 476 477 def reload(self): 478 """Reload the firmware image that may be changed.""" 479 self._ec_handler.new_image() 480 481 def get_version(self): 482 """Get EC version via mosys. 483 484 @return: A string of the EC version. 485 """ 486 return self._os_if.run_shell_command_get_output( 487 'mosys ec info | sed "s/.*| //"')[0] 488 489 def get_active_hash(self): 490 """Get hash of active EC RW firmware.""" 491 return self._os_if.run_shell_command_get_output( 492 'ectool echash | grep hash: | sed "s/hash:\s\+//"')[0] 493 494 def dump_whole(self, ec_path): 495 """Dump the current EC firmware to a file, specified by ec_path. 496 497 @param ec_path: The path of the EC image to be written. 498 """ 499 self._ec_handler.dump_whole(ec_path) 500 501 def write_whole(self, ec_path): 502 """Write the firmware from ec_path to the current system. 503 504 @param ec_path: The path of the source EC image. 505 """ 506 self._ec_handler.new_image(ec_path) 507 self._ec_handler.write_whole() 508 509 def corrupt_body(self, section): 510 """Corrupt the requested EC section body. 511 512 @param section: An EC section, either 'a' or 'b'. 513 """ 514 self._ec_handler.corrupt_firmware_body(section, corrupt_all=True) 515 516 def dump_firmware(self, ec_path): 517 """Dump the current EC firmware to a file, specified by ec_path. 518 519 @param ec_path: The path of the EC image to be written. 520 """ 521 self._ec_handler.dump_whole(ec_path) 522 523 def set_write_protect(self, enable): 524 """Enable write protect of the EC flash chip. 525 526 @param enable: True if activating EC write protect. Otherwise, False. 527 """ 528 if enable: 529 self._ec_handler.enable_write_protect() 530 else: 531 self._ec_handler.disable_write_protect() 532 533 def is_efs(self): 534 """Return True if the EC supports EFS.""" 535 return self._ec_handler.has_section_body('rw_b') 536 537 def copy_rw(self, from_section, to_section): 538 """Copy EC RW from from_section to to_section.""" 539 self._ec_handler.copy_from_to(from_section, to_section) 540 541 def reboot_to_switch_slot(self): 542 """Reboot EC to switch the active RW slot.""" 543 self._os_if.run_shell_command( 544 'ectool reboot_ec cold switch-slot', modifies_device=True) 545 546 def strip_modified_fwids(self): 547 """Strip any trailing suffixes (from modify_fwids) out of the FWIDs.""" 548 return self._ec_handler.strip_modified_fwids() 549 550 551class KernelServicer(object): 552 """Class to service all Kernel RPCs""" 553 554 def __init__(self, os_if): 555 """ 556 @type os_if: os_interface.OSInterface 557 """ 558 self._os_if = os_if 559 self._kernel_handler = kernel_handler.KernelHandler() 560 self._kernel_handler.init( 561 self._os_if, 562 dev_key_path='/usr/share/vboot/devkeys', 563 internal_disk=True) 564 565 def corrupt_sig(self, section): 566 """Corrupt the requested kernel section. 567 568 @param section: A kernel section, either 'a' or 'b'. 569 """ 570 self._kernel_handler.corrupt_kernel(section) 571 572 def restore_sig(self, section): 573 """Restore the requested kernel section (previously corrupted). 574 575 @param section: A kernel section, either 'a' or 'b'. 576 """ 577 self._kernel_handler.restore_kernel(section) 578 579 def _modify_version(self, section, delta): 580 """Modify kernel version for the requested section, by adding delta. 581 582 The passed in delta, a positive or a negative number, is added to the 583 original kernel version. 584 """ 585 original_version = self._kernel_handler.get_version(section) 586 new_version = original_version + delta 587 self._os_if.log('Setting kernel section %s version from %d to %d' % 588 (section, original_version, new_version)) 589 self._kernel_handler.set_version(section, new_version) 590 591 def move_version_backward(self, section): 592 """Decrement kernel version for the requested section.""" 593 self._modify_version(section, -1) 594 595 def move_version_forward(self, section): 596 """Increase kernel version for the requested section.""" 597 self._modify_version(section, 1) 598 599 def get_version(self, section): 600 """Return kernel version.""" 601 return self._kernel_handler.get_version(section) 602 603 def get_datakey_version(self, section): 604 """Return kernel datakey version.""" 605 return self._kernel_handler.get_datakey_version(section) 606 607 def diff_a_b(self): 608 """Compare kernel A with B. 609 610 @return: True: if kernel A is different with B. 611 False: if kernel A is the same as B. 612 """ 613 rootdev = self._os_if.get_root_dev() 614 kernel_a = self._os_if.join_part(rootdev, '2') 615 kernel_b = self._os_if.join_part(rootdev, '4') 616 617 # The signature (some kind of hash) for the kernel body is stored in 618 # the beginning. So compare the first 64KB (including header, preamble, 619 # and signature) should be enough to check them identical. 620 header_a = self._os_if.read_partition(kernel_a, 0x10000) 621 header_b = self._os_if.read_partition(kernel_b, 0x10000) 622 623 return header_a != header_b 624 625 def resign_with_keys(self, section, key_path=None): 626 """Resign kernel with temporary key.""" 627 self._kernel_handler.resign_kernel(section, key_path) 628 629 def dump(self, section, kernel_path): 630 """Dump the specified kernel to a file. 631 632 @param section: The kernel to dump. May be A or B. 633 @param kernel_path: The path to the kernel image to be written. 634 """ 635 self._kernel_handler.dump_kernel(section, kernel_path) 636 637 def write(self, section, kernel_path): 638 """Write a kernel image to the specified section. 639 640 @param section: The kernel to dump. May be A or B. 641 @param kernel_path: The path to the kernel image. 642 """ 643 self._kernel_handler.write_kernel(section, kernel_path) 644 645 def get_sha(self, section): 646 """Return the SHA1 hash of the specified kernel section.""" 647 return self._kernel_handler.get_sha(section) 648 649 650class RootfsServicer(object): 651 """Class to service all RootFS RPCs""" 652 653 def __init__(self, os_if): 654 """ 655 @type os_if: os_interface.OSInterface 656 """ 657 self._os_if = os_if 658 self._rootfs_handler = rootfs_handler.RootfsHandler() 659 self._rootfs_handler.init(self._os_if) 660 661 def verify_rootfs(self, section): 662 """Verifies the integrity of the root FS. 663 664 @param section: The rootfs to verify. May be A or B. 665 """ 666 return self._rootfs_handler.verify_rootfs(section) 667 668 669class RpcSettingsServicer(object): 670 """Class to service RPCs for settings of the RPC server itself""" 671 672 def __init__(self, os_if): 673 """ 674 @type os_if: os_interface.OSInterface 675 """ 676 self._os_if = os_if 677 678 def enable_test_mode(self): 679 """Enable test mode (avoids writing to flash or gpt)""" 680 self._os_if.test_mode = True 681 682 def disable_test_mode(self): 683 """Disable test mode and return to normal operation""" 684 self._os_if.test_mode = False 685 686 687class SystemServicer(object): 688 """Class to service all System RPCs""" 689 690 def __init__(self, os_if): 691 """ 692 @type os_if: os_interface.OSInterface 693 """ 694 self._os_if = os_if 695 self._key_checker = firmware_check_keys.firmwareCheckKeys() 696 697 def is_available(self): 698 """Function for polling the RPC server availability. 699 700 @return: Always True. 701 """ 702 return True 703 704 def dump_log(self, remove_log=False): 705 """Dump the log file. 706 707 @param remove_log: Remove the log file after dump. 708 @return: String of the log file content. 709 """ 710 with open(self._os_if.log_file) as f: 711 log = f.read() 712 if remove_log: 713 os.remove(self._os_if.log_file) 714 return log 715 716 def run_shell_command(self, command, block=True): 717 """Run shell command. 718 719 @param command: A shell command to be run. 720 @param block: if True (default), wait for command to finish 721 """ 722 self._os_if.run_shell_command(command, block=block) 723 724 def run_shell_command_check_output(self, command, success_token): 725 """Run shell command and check its stdout for a string. 726 727 @param command: A shell command to be run. 728 @param success_token: A string to search the output for. 729 @return: A Boolean indicating whether the success_token was found in 730 the command output. 731 """ 732 return self._os_if.run_shell_command_check_output( 733 command, success_token) 734 735 def run_shell_command_get_output(self, command, include_stderr=False): 736 """Run shell command and get its console output. 737 738 @param command: A shell command to be run. 739 @return: A list of strings stripped of the newline characters. 740 """ 741 return self._os_if.run_shell_command_get_output(command, include_stderr) 742 743 def run_shell_command_get_status(self, command): 744 """Run shell command and get its console status. 745 746 @param command: A shell command to be run. 747 @return: The returncode of the process 748 @rtype: int 749 """ 750 return self._os_if.run_shell_command_get_status(command) 751 752 def get_platform_name(self): 753 """Get the platform name of the current system. 754 755 @return: A string of the platform name. 756 """ 757 platform = cros_config.call_cros_config_get_output( 758 '/identity platform-name', 759 self._os_if.run_shell_command_get_result) 760 if not platform: 761 raise Exception('Failed getting platform name from cros_config') 762 return platform 763 764 def get_model_name(self): 765 """Get the model name of the current system. 766 767 @return: A string of the model name. 768 """ 769 model = cros_config.call_cros_config_get_output( 770 '/ name', self._os_if.run_shell_command_get_result) 771 if not model: 772 raise Exception('Failed getting model name from cros_config') 773 return model 774 775 def dev_tpm_present(self): 776 """Check if /dev/tpm0 is present. 777 778 @return: Boolean true or false. 779 """ 780 return os.path.exists('/dev/tpm0') 781 782 def get_crossystem_value(self, key): 783 """Get crossystem value of the requested key. 784 785 @param key: A crossystem key. 786 @return: A string of the requested crossystem value. 787 """ 788 return self._os_if.run_shell_command_get_output( 789 'crossystem %s' % key)[0] 790 791 def get_root_dev(self): 792 """Get the name of root device without partition number. 793 794 @return: A string of the root device without partition number. 795 """ 796 return self._os_if.get_root_dev() 797 798 def get_root_part(self): 799 """Get the name of root device with partition number. 800 801 @return: A string of the root device with partition number. 802 """ 803 return self._os_if.get_root_part() 804 805 def set_try_fw_b(self, count=1): 806 """Set 'Try Firmware B' flag in crossystem. 807 808 @param count: # times to try booting into FW B 809 """ 810 self._os_if.cs.fwb_tries = count 811 812 def set_fw_try_next(self, next, count=0): 813 """Set fw_try_next to A or B. 814 815 @param next: Next FW to reboot to (A or B) 816 @param count: # of times to try booting into FW <next> 817 """ 818 self._os_if.cs.fw_try_next = next 819 if count: 820 self._os_if.cs.fw_try_count = count 821 822 def get_fw_vboot2(self): 823 """Get fw_vboot2.""" 824 try: 825 return self._os_if.cs.fw_vboot2 == '1' 826 except os_interface.OSInterfaceError: 827 return False 828 829 def request_recovery_boot(self): 830 """Request running in recovery mode on the restart.""" 831 self._os_if.cs.request_recovery() 832 833 def get_dev_boot_usb(self): 834 """Get dev_boot_usb value which controls developer mode boot from USB. 835 836 @return: True if enable, False if disable. 837 """ 838 return self._os_if.cs.dev_boot_usb == '1' 839 840 def set_dev_boot_usb(self, value): 841 """Set dev_boot_usb value which controls developer mode boot from USB. 842 843 @param value: True to enable, False to disable. 844 """ 845 self._os_if.cs.dev_boot_usb = 1 if value else 0 846 847 def is_removable_device_boot(self): 848 """Check the current boot device is removable. 849 850 @return: True: if a removable device boots. 851 False: if a non-removable device boots. 852 """ 853 root_part = self._os_if.get_root_part() 854 return self._os_if.is_removable_device(root_part) 855 856 def get_internal_device(self): 857 """Get the internal disk by given the current disk.""" 858 root_part = self._os_if.get_root_part() 859 return self._os_if.get_internal_disk(root_part) 860 861 def create_temp_dir(self, prefix='backup_'): 862 """Create a temporary directory and return the path.""" 863 return tempfile.mkdtemp(prefix=prefix) 864 865 def remove_file(self, file_path): 866 """Remove the file.""" 867 return self._os_if.remove_file(file_path) 868 869 def remove_dir(self, dir_path): 870 """Remove the directory.""" 871 return self._os_if.remove_dir(dir_path) 872 873 def check_keys(self, expected_sequence): 874 """Check the keys sequence was as expected. 875 876 @param expected_sequence: A list of expected key sequences. 877 """ 878 return self._key_checker.check_keys(expected_sequence) 879 880 881class TpmServicer(object): 882 """Class to service all TPM RPCs""" 883 884 def __init__(self, os_if): 885 """ 886 @type os_if: os_interface.OSInterface 887 """ 888 self._os_if = os_if 889 890 # This attribute is accessed via a property, so it can load lazily 891 # when actually used by the test. 892 self._real_tpm_handler = tpm_handler.TpmHandler(self._os_if) 893 894 @property 895 def _tpm_handler(self): 896 """Handler for the TPM 897 898 @rtype: tpm_handler.TpmHandler 899 """ 900 if not self._real_tpm_handler.initialized: 901 self._real_tpm_handler.init() 902 return self._real_tpm_handler 903 904 def get_firmware_version(self): 905 """Retrieve tpm firmware body version.""" 906 return self._tpm_handler.get_fw_version() 907 908 def get_firmware_datakey_version(self): 909 """Retrieve tpm firmware data key version.""" 910 return self._tpm_handler.get_fw_key_version() 911 912 def get_kernel_version(self): 913 """Retrieve tpm kernel body version.""" 914 return self._tpm_handler.get_kernel_version() 915 916 def get_kernel_datakey_version(self): 917 """Retrieve tpm kernel data key version.""" 918 return self._tpm_handler.get_kernel_key_version() 919 920 def get_tpm_version(self): 921 """Returns '1.2' or '2.0' as a string.""" 922 # tpmc can return this without stopping daemons, so access real handler. 923 return self._real_tpm_handler.get_tpm_version() 924 925 def stop_daemon(self): 926 """Stop tpm related daemon.""" 927 return self._tpm_handler.stop_daemon() 928 929 def restart_daemon(self): 930 """Restart tpm related daemon which was stopped by stop_daemon().""" 931 return self._tpm_handler.restart_daemon() 932 933 934class UpdaterServicer(object): 935 """Class to service all Updater RPCs""" 936 937 def __init__(self, os_if): 938 """ 939 @type os_if: os_interface.OSInterface 940 """ 941 self._os_if = os_if 942 self._real_updater = firmware_updater.FirmwareUpdater(self._os_if) 943 944 @property 945 def _updater(self): 946 """Handler for the updater 947 948 @rtype: firmware_updater.FirmwareUpdater 949 """ 950 if not self._real_updater.initialized: 951 self._real_updater.init() 952 return self._real_updater 953 954 def cleanup(self): 955 """Clean up the temporary directory""" 956 # Use the updater directly, to avoid initializing it just to clean it up 957 self._real_updater.cleanup_temp_dir() 958 959 def stop_daemon(self): 960 """Stop update-engine daemon.""" 961 return self._real_updater.stop_daemon() 962 963 def start_daemon(self): 964 """Start update-engine daemon.""" 965 return self._real_updater.start_daemon() 966 967 def get_section_fwid(self, target='bios', section=None): 968 """Retrieve shellball's RW or RO fwid.""" 969 return self._updater.get_section_fwid(target, section) 970 971 def get_all_fwids(self, target='bios'): 972 """Retrieve shellball's RW and/or RO fwids for all sections.""" 973 return self._updater.get_all_fwids(target) 974 975 def get_all_installed_fwids(self, target='bios', filename=None): 976 """Retrieve installed (possibly emulated) fwids for the target.""" 977 return self._updater.get_all_installed_fwids(target, filename) 978 979 def modify_fwids(self, target='bios', sections=None): 980 """Modify the fwid in the image, but don't flash it.""" 981 return self._updater.modify_fwids(target, sections) 982 983 def modify_ecid_and_flash_to_bios(self): 984 """Modify ecid, put it to AP firmware, and flash it to the system.""" 985 self._updater.modify_ecid_and_flash_to_bios() 986 987 def corrupt_diagnostics_image(self, local_filename): 988 """Corrupts a diagnostics image in the CBFS working directory. 989 990 @param local_filename: Filename for storing the diagnostics image in the 991 CBFS working directory 992 """ 993 self._updater.corrupt_diagnostics_image(local_filename) 994 995 def get_ec_hash(self): 996 """Return the hex string of the EC hash.""" 997 blob = self._updater.get_ec_hash() 998 # Format it to a hex string 999 return ''.join('%02x' % ord(c) for c in blob) 1000 1001 def resign_firmware(self, version): 1002 """Resign firmware with version. 1003 1004 @param version: new version number. 1005 """ 1006 self._updater.resign_firmware(version) 1007 1008 def extract_shellball(self, append=None): 1009 """Extract shellball with the given append suffix. 1010 1011 @param append: use for the shellball name. 1012 """ 1013 return self._updater.extract_shellball(append) 1014 1015 def repack_shellball(self, append=None): 1016 """Repack shellball with new fwid. 1017 1018 @param append: use for the shellball name. 1019 """ 1020 return self._updater.repack_shellball(append) 1021 1022 def reset_shellball(self): 1023 """Revert to the stock shellball""" 1024 self._updater.reset_shellball() 1025 1026 def reload_images(self): 1027 """Reload handlers from the on-disk images, in case they've changed.""" 1028 self._updater.reload_images() 1029 1030 def run_firmwareupdate(self, mode, append=None, options=()): 1031 """Run updater with the given options 1032 1033 @param mode: mode for the updater 1034 @param append: extra string appended to shellball filename to run 1035 @param options: options for chromeos-firmwareupdate 1036 @return: returncode of the updater 1037 @rtype: int 1038 """ 1039 return self._updater.run_firmwareupdate(mode, append, options) 1040 1041 def run_autoupdate(self, append): 1042 """Run chromeos-firmwareupdate with autoupdate mode.""" 1043 options = ['--noupdate_ec', '--wp=1'] 1044 self._updater.run_firmwareupdate( 1045 mode='autoupdate', append=append, options=options) 1046 1047 def run_factory_install(self): 1048 """Run chromeos-firmwareupdate with factory_install mode.""" 1049 options = ['--noupdate_ec', '--wp=0'] 1050 self._updater.run_firmwareupdate( 1051 mode='factory_install', options=options) 1052 1053 def run_bootok(self, append): 1054 """Run chromeos-firmwareupdate with bootok mode.""" 1055 self._updater.run_firmwareupdate(mode='bootok', append=append) 1056 1057 def run_recovery(self): 1058 """Run chromeos-firmwareupdate with recovery mode.""" 1059 options = ['--noupdate_ec', '--nocheck_keys', '--force', '--wp=1'] 1060 self._updater.run_firmwareupdate(mode='recovery', options=options) 1061 1062 def cbfs_setup_work_dir(self): 1063 """Sets up cbfstool work directory.""" 1064 return self._updater.cbfs_setup_work_dir() 1065 1066 def cbfs_extract_chip(self, fw_name): 1067 """Runs cbfstool to extract chip firmware. 1068 1069 @param fw_name: Name of chip firmware to extract. 1070 @return: Boolean success status. 1071 """ 1072 return self._updater.cbfs_extract_chip(fw_name) 1073 1074 def cbfs_extract_diagnostics(self, diag_name, local_filename): 1075 """Runs cbfstool to extract a diagnostics image. 1076 1077 @param diag_name: Name of the diagnostics image in CBFS 1078 @param local_filename: Filename for storing the diagnostics image in the 1079 CBFS working directory 1080 """ 1081 self._updater.cbfs_extract_diagnostics(diag_name, local_filename) 1082 1083 def cbfs_replace_diagnostics(self, diag_name, local_filename): 1084 """Runs cbfstool to replace a diagnostics image in the firmware image. 1085 1086 @param diag_name: Name of the diagnostics image in CBFS 1087 @param local_filename: Filename for storing the diagnostics image in the 1088 CBFS working directory 1089 """ 1090 self._updater.cbfs_replace_diagnostics(diag_name, local_filename) 1091 1092 def cbfs_get_chip_hash(self, fw_name): 1093 """Gets the chip firmware hash blob. 1094 1095 @param fw_name: Name of chip firmware whose hash blob to return. 1096 @return: Hex string of hash blob. 1097 """ 1098 return self._updater.cbfs_get_chip_hash(fw_name) 1099 1100 def cbfs_replace_chip(self, fw_name): 1101 """Runs cbfstool to replace chip firmware. 1102 1103 @param fw_name: Name of chip firmware to extract. 1104 @return: Boolean success status. 1105 """ 1106 return self._updater.cbfs_replace_chip(fw_name) 1107 1108 def cbfs_sign_and_flash(self): 1109 """Runs cbfs signer and flash it. 1110 1111 @param fw_name: Name of chip firmware to extract. 1112 @return: Boolean success status. 1113 """ 1114 return self._updater.cbfs_sign_and_flash() 1115 1116 def get_temp_path(self): 1117 """Get updater's temp directory path.""" 1118 return self._updater.get_temp_path() 1119 1120 def get_keys_path(self): 1121 """Get updater's keys directory path.""" 1122 return self._updater.get_keys_path() 1123 1124 def get_work_path(self): 1125 """Get updater's work directory path.""" 1126 return self._updater.get_work_path() 1127 1128 def get_bios_relative_path(self): 1129 """Gets the relative path of the bios image in the shellball.""" 1130 return self._updater.get_bios_relative_path() 1131 1132 def get_ec_relative_path(self): 1133 """Gets the relative path of the ec image in the shellball.""" 1134 return self._updater.get_ec_relative_path() 1135 1136 def copy_bios(self, filename): 1137 """Make a copy of the shellball bios.bin""" 1138 return self._updater.copy_bios(filename) 1139 1140 def get_image_gbb_flags(self, filename=None): 1141 """Get the GBB flags in the given image (shellball image if unspecified) 1142 1143 @param filename: the image path to act on (None to use shellball image) 1144 @return: An integer of the GBB flags. 1145 """ 1146 return self._updater.get_image_gbb_flags(filename) 1147 1148 def set_image_gbb_flags(self, flags, filename=None): 1149 """Set the GBB flags in the given image (shellball image if unspecified) 1150 1151 @param flags: the flags to set 1152 @param filename: the image path to act on (None to use shellball image) 1153 1154 @type flags: int 1155 @type filename: str | None 1156 """ 1157 return self._updater.set_image_gbb_flags(flags, filename) 1158