1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Code to provide functions for FAFT tests. 5 6These will be exposed via an xmlrpc server running on the DUT. 7 8@note: When adding categories, please also update server/cros/faft/rpc_proxy.pyi 9""" 10 11from __future__ import print_function 12 13import binascii 14import httplib 15import logging 16import os 17import signal 18import six 19import sys 20import tempfile 21import traceback 22import xmlrpclib 23 24from autotest_lib.client.common_lib import lsbrelease_utils 25from autotest_lib.client.common_lib.cros import cros_config 26from autotest_lib.client.cros import xmlrpc_server 27from autotest_lib.client.cros.faft.utils import ( 28 cgpt_handler, 29 os_interface, 30 firmware_check_keys, 31 firmware_updater, 32 flashrom_handler, 33 kernel_handler, 34 rootfs_handler, 35 tpm_handler, 36) 37 38 39class FaftXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate): 40 """ 41 A class which routes RPC methods to the proper servicers. 42 43 Firmware tests are able to call an RPC method via: 44 <FAFTClient>.[category].[method_name](params) 45 When XML-RPC is being used, the RPC server routes the called method to: 46 <XmlRpcDelegate>._dispatch('[category].[method_name]', params) 47 The method is then dispatched to a Servicer class. 48 """ 49 50 def __init__(self, os_if): 51 """Initialize the servicer for each category. 52 53 @type os_if: os_interface.OSInterface 54 """ 55 self._ready = False 56 self.bios = BiosServicer(os_if) 57 self.cgpt = CgptServicer(os_if) 58 self.ec = EcServicer(os_if) 59 self.kernel = KernelServicer(os_if) 60 self.rootfs = RootfsServicer(os_if) 61 self.rpc_settings = RpcSettingsServicer(os_if) 62 self.system = SystemServicer(os_if) 63 self.tpm = TpmServicer(os_if) 64 self.updater = UpdaterServicer(os_if) 65 66 self._rpc_servicers = { 67 'bios': self.bios, 68 'cgpt': self.cgpt, 69 'ec': self.ec, 70 'kernel': self.kernel, 71 'rpc_settings': self.rpc_settings, 72 'rootfs': self.rootfs, 73 'system': self.system, 74 'tpm': self.tpm, 75 'updater': self.updater 76 } 77 78 self._os_if = os_if 79 80 def __enter__(self): 81 """Enter the the delegate context (when XmlRpcServer.run() starts). 82 83 The server is marked ready here, rather than immediately when created. 84 """ 85 logging.debug("%s: Serving FAFT functions", self.__class__.__name__) 86 self._ready = True 87 self._os_if.start_file_logging() 88 89 def __exit__(self, exception, value, traceback): 90 """Exit the delegate context (when XmlRpcServer.run() finishes). 91 92 The server is marked not ready, to prevent the client from using 93 the wrong server when quitting one instance and starting another. 94 """ 95 self._ready = False 96 self._os_if.stop_file_logging() 97 logging.debug("%s: Done.", self.__class__.__name__) 98 99 def quit(self): 100 """Exit the xmlrpc server.""" 101 self._ready = False 102 os.kill(os.getpid(), signal.SIGINT) 103 104 def ready(self): 105 """Is the RPC server ready to serve calls in a useful manner? 106 107 The server is only marked ready during the XmlRpcServer.run() loop. 108 This method suppresses the extra logging of ready() from the superclass. 109 """ 110 return self._ready 111 112 def _report_error(self, fault_code, message, exc_info=False): 113 """Raise the given RPC error text, including information about last 114 exception from sys.exc_info(). The log file gets the traceback in text; 115 the raised exception keeps the old traceback (but not in text). 116 117 Note: this must be called right after the original exception, or it may 118 report the wrong exception. 119 120 @raise: xmlrpclib.Fault 121 122 @param fault_code: the status code to use 123 @param message: the string message to include before exception text 124 @param exc_info: true to use the tuple from sys.exc_info() 125 @return the exception to raise 126 127 @type fault_code: int 128 @type message: str 129 @type exc_info: bool 130 @rtype: Exception 131 """ 132 if exc_info: 133 tb = None 134 try: 135 (exc_class, exc, tb) = sys.exc_info() 136 137 tb_str = ''.join( 138 traceback.format_exception(exc_class, exc, tb)) 139 self._os_if.log('Error: %s.\n%s' % (message, tb_str.rstrip())) 140 141 if not isinstance(exc, xmlrpclib.Fault): 142 exc_str = ''.join( 143 traceback.format_exception_only(exc_class, exc)) 144 exc = xmlrpclib.Fault( 145 fault_code, '%s. %s' % (message, exc_str.rstrip())) 146 six.reraise(exc, None, tb) 147 finally: 148 del exc_info 149 del tb 150 else: 151 self._os_if.log('Error: %s' % message) 152 return xmlrpclib.Fault(fault_code, message) 153 154 def _dispatch(self, called_method, params): 155 """ 156 Send any RPC call to the appropriate servicer method. 157 158 @param called_method: The method of FAFTClient that was called. 159 Should take the form 'category.method'. 160 @param params: The arguments passed into the method. 161 162 @type called_method: str 163 @type params: tuple 164 165 @raise: xmlrpclib.Fault (using http error codes for fault codes) 166 """ 167 self._os_if.log('Called: %s%s' % (called_method, params)) 168 169 name_pieces = called_method.split('.') 170 171 if not name_pieces: 172 raise self._report_error( 173 httplib.BAD_REQUEST, 174 'RPC request is invalid (completely empty): "%s"' % 175 called_method) 176 177 method_name = name_pieces.pop() 178 category = '.'.join(name_pieces) 179 180 if (method_name.startswith('_') 181 and method_name not in ('__str__', '__repr__', '__call__')): 182 # *._private() or *.__special__() 183 # Forbid early, to prevent seeing which methods exist. 184 raise self._report_error( 185 httplib.FORBIDDEN, 186 'RPC method name is private: %s%s[%s]' % 187 (category, '.' if category else '', method_name)) 188 189 elif not method_name: 190 # anything.() 191 raise self._report_error( 192 httplib.BAD_REQUEST, 193 'RPC method name is empty: %s%s[%s]' % 194 (category, '.' if category else '', method_name)) 195 196 if category in self._rpc_servicers: 197 # system.func() 198 holder = self._rpc_servicers[category] 199 if not hasattr(holder, method_name): 200 raise self._report_error( 201 httplib.NOT_FOUND, 202 'RPC method not found: %s.[%s]' % 203 (category, method_name)) 204 205 elif category: 206 # invalid.func() 207 raise self._report_error( 208 httplib.NOT_FOUND, 209 'RPC category not found: [%s].%s' % 210 (category, method_name)) 211 212 else: 213 # .func() or .invalid() 214 holder = self 215 if not hasattr(holder, method_name): 216 raise self._report_error( 217 httplib.NOT_FOUND, 218 'RPC method not found: [%s]' % method_name) 219 220 try: 221 method = getattr(holder, method_name) 222 223 except AttributeError: 224 raise self._report_error( 225 httplib.NOT_IMPLEMENTED, 226 'RPC method not found: "%s"' % called_method, exc_info=True) 227 228 try: 229 return method(*params) 230 231 except Exception: 232 raise self._report_error( 233 httplib.INTERNAL_SERVER_ERROR, 234 'RPC call failed: %s()' % called_method, exc_info=True) 235 236 237class BiosServicer(object): 238 """Class to service all BIOS RPCs""" 239 240 def __init__(self, os_if): 241 """ 242 @type os_if: os_interface.OSInterface 243 """ 244 self._os_if = os_if 245 246 # This attribute is accessed via a property, so it can load lazily 247 # when actually used by the test. 248 self._real_bios_handler = flashrom_handler.FlashromHandler( 249 self._os_if, None, '/usr/share/vboot/devkeys', 'bios') 250 251 @property 252 def _bios_handler(self): 253 """Return the BIOS flashrom handler, after initializing it if necessary 254 255 @rtype: flashrom_handler.FlashromHandler 256 """ 257 if not self._real_bios_handler.initialized: 258 self._real_bios_handler.init() 259 return self._real_bios_handler 260 261 def reload(self): 262 """Reload the firmware image that may be changed.""" 263 self._bios_handler.new_image() 264 265 def get_gbb_flags(self): 266 """Get the GBB flags. 267 268 @return: An integer of the GBB flags. 269 """ 270 return self._bios_handler.get_gbb_flags() 271 272 def set_gbb_flags(self, flags): 273 """Set the GBB flags. 274 275 @param flags: An integer of the GBB flags. 276 """ 277 self._bios_handler.set_gbb_flags(flags, write_through=True) 278 279 def get_preamble_flags(self, section): 280 """Get the preamble flags of a firmware section. 281 282 @param section: A firmware section, either 'a' or 'b'. 283 @return: An integer of the preamble flags. 284 """ 285 return self._bios_handler.get_section_flags(section) 286 287 def set_preamble_flags(self, section, flags): 288 """Set the preamble flags of a firmware section. 289 290 @param section: A firmware section, either 'a' or 'b'. 291 @param flags: An integer of preamble flags. 292 """ 293 version = self.get_version(section) 294 self._bios_handler.set_section_version( 295 section, version, flags, write_through=True) 296 297 def get_body_sha(self, section): 298 """Get SHA1 hash of BIOS RW firmware section. 299 300 @param section: A firmware section, either 'a' or 'b'. 301 @return: A string of the body SHA1 hash. 302 """ 303 return self._bios_handler.get_section_sha(section) 304 305 def get_sig_sha(self, section): 306 """Get SHA1 hash of firmware vblock in section. 307 308 @param section: A firmware section, either 'a' or 'b'. 309 @return: A string of the sig SHA1 hash. 310 """ 311 return self._bios_handler.get_section_sig_sha(section) 312 313 def get_section_fwid(self, section=None): 314 """Retrieve the RO or RW fwid. 315 316 @param section: A firmware section, either 'a' or 'b'. 317 @return: A string of the fwid 318 """ 319 return self._bios_handler.get_section_fwid(section) 320 321 def corrupt_sig(self, section): 322 """Corrupt the requested firmware section signature. 323 324 @param section: A firmware section, either 'a' or 'b'. 325 """ 326 self._bios_handler.corrupt_firmware(section) 327 328 def restore_sig(self, section): 329 """Restore the previously corrupted firmware section signature. 330 331 @param section: A firmware section, either 'a' or 'b'. 332 """ 333 self._bios_handler.restore_firmware(section) 334 335 def corrupt_body(self, section, corrupt_all=False): 336 """Corrupt the requested firmware section body. 337 338 @param section: A firmware section, either 'a' or 'b'. 339 @param corrupt_all (optional): Corrupt all bytes of the fw section, 340 rather than just one byte. 341 """ 342 self._bios_handler.corrupt_firmware_body(section, corrupt_all) 343 344 def restore_body(self, section): 345 """Restore the previously corrupted firmware section body. 346 347 @param section: A firmware section, either 'a' or 'b'. 348 """ 349 self._bios_handler.restore_firmware_body(section) 350 351 def _modify_version(self, section, delta): 352 """Modify firmware version for the requested section, by adding delta. 353 354 The passed in delta, a positive or a negative number, is added to the 355 original firmware version. 356 """ 357 original_version = self.get_version(section) 358 new_version = original_version + delta 359 flags = self._bios_handler.get_section_flags(section) 360 self._os_if.log('Setting firmware section %s version from %d to %d' % 361 (section, original_version, new_version)) 362 self._bios_handler.set_section_version( 363 section, new_version, flags, write_through=True) 364 365 def move_version_backward(self, section): 366 """Decrement firmware version for the requested section.""" 367 self._modify_version(section, -1) 368 369 def move_version_forward(self, section): 370 """Increase firmware version for the requested section.""" 371 self._modify_version(section, 1) 372 373 def get_version(self, section): 374 """Retrieve firmware version of a section.""" 375 return self._bios_handler.get_section_version(section) 376 377 def get_datakey_version(self, section): 378 """Return firmware data key version.""" 379 return self._bios_handler.get_section_datakey_version(section) 380 381 def get_kernel_subkey_version(self, section): 382 """Return kernel subkey version.""" 383 return self._bios_handler.get_section_kernel_subkey_version(section) 384 385 def dump_whole(self, bios_path): 386 """Dump the current BIOS firmware to a file, specified by bios_path. 387 388 @param bios_path: The path of the BIOS image to be written. 389 """ 390 self._bios_handler.dump_whole(bios_path) 391 392 def write_whole(self, bios_path): 393 """Write the firmware from bios_path to the current system. 394 395 @param bios_path: The path of the source BIOS image 396 """ 397 self._bios_handler.new_image(bios_path) 398 self._bios_handler.write_whole() 399 400 def strip_modified_fwids(self): 401 """Strip trailing suffixes out of the FWIDs (see modify_image_fwids). 402 403 @return: a dict of any fwids that were adjusted, by section (ro, a, b) 404 @rtype: dict 405 """ 406 return self._bios_handler.strip_modified_fwids() 407 408 def set_write_protect_region(self, region, enabled=None): 409 """Modify software write protect region and flag in one operation. 410 411 @param region: Region to set (usually WP_RO) 412 @param enabled: If True, run --wp-enable; if False, run --wp-disable. 413 If None (default), don't specify either one. 414 """ 415 self._bios_handler.set_write_protect_region(region, enabled) 416 417 def set_write_protect_range(self, start, length, enabled=None): 418 """Modify software write protect range and flag in one operation. 419 420 @param start: offset (bytes) from start of flash to start of range 421 @param length: offset (bytes) from start of range to end of range 422 @param enabled: If True, run --wp-enable; if False, run --wp-disable. 423 If None (default), don't specify either one. 424 """ 425 self._bios_handler.set_write_protect_range(start, length, enabled) 426 427 def get_write_protect_status(self): 428 """Get a dict describing the status of the write protection 429 430 @return: {'enabled': True/False, 'start': '0x0', 'length': '0x0', ...} 431 @rtype: dict 432 """ 433 return self._bios_handler.get_write_protect_status() 434 435 def is_available(self): 436 """Return True if available, False if not.""" 437 # Use the real handler, to avoid .init() raising an exception 438 return self._real_bios_handler.is_available() 439 440 def get_write_cmd(self, image=None): 441 """Get the command needed to write the whole image to the device. 442 443 @param image: the filename (empty to use current handler data) 444 """ 445 if image: 446 # Don't bother loading the usual image, since it's overridden. 447 return self._real_bios_handler.get_write_cmd(image) 448 else: 449 return self._bios_handler.get_write_cmd() 450 451class CgptServicer(object): 452 """Class to service all CGPT RPCs""" 453 454 def __init__(self, os_if): 455 """ 456 @type os_if: os_interface.OSInterface 457 """ 458 self._os_if = os_if 459 self._cgpt_handler = cgpt_handler.CgptHandler(self._os_if) 460 461 def get_attributes(self): 462 """Get kernel attributes.""" 463 rootdev = self._os_if.get_root_dev() 464 self._cgpt_handler.read_device_info(rootdev) 465 return { 466 'A': self._cgpt_handler.get_partition(rootdev, 'KERN-A'), 467 'B': self._cgpt_handler.get_partition(rootdev, 'KERN-B') 468 } 469 470 def set_attributes(self, a=None, b=None): 471 """Set kernel attributes for either partition (or both).""" 472 partitions = {'A': a, 'B': b} 473 rootdev = self._os_if.get_root_dev() 474 modifiable_attributes = self._cgpt_handler.ATTR_TO_COMMAND.keys() 475 for partition_name in partitions.keys(): 476 partition = partitions[partition_name] 477 if partition is None: 478 continue 479 attributes_to_set = { 480 key: partition[key] 481 for key in modifiable_attributes 482 } 483 if attributes_to_set: 484 self._cgpt_handler.set_partition( 485 rootdev, 'KERN-%s' % partition_name, attributes_to_set) 486 487 488class EcServicer(object): 489 """Class to service all EC RPCs""" 490 491 def __init__(self, os_if): 492 """ 493 @type os_if: os_interface.OSInterface 494 """ 495 self._os_if = os_if 496 497 # This attribute is accessed via a property, so it can load lazily 498 # when actually used by the test. 499 self._real_ec_handler = None 500 ec_status = self._os_if.run_shell_command_get_status('mosys ec info') 501 if ec_status == 0: 502 self._real_ec_handler = flashrom_handler.FlashromHandler( 503 self._os_if, 'ec_root_key.vpubk', 504 '/usr/share/vboot/devkeys', 'ec') 505 506 else: 507 self._os_if.log('No EC is reported by mosys (rc=%s).' % ec_status) 508 509 @property 510 def _ec_handler(self): 511 """Return the EC flashrom handler, after initializing it if necessary 512 513 @rtype: flashrom_handler.FlashromHandler 514 """ 515 if not self._real_ec_handler: 516 # No EC handler if board has no EC 517 return None 518 519 if not self._real_ec_handler.initialized: 520 self._real_ec_handler.init() 521 return self._real_ec_handler 522 523 def reload(self): 524 """Reload the firmware image that may be changed.""" 525 self._ec_handler.new_image() 526 527 def get_version(self): 528 """Get EC version via mosys. 529 530 @return: A string of the EC version. 531 """ 532 return self._os_if.run_shell_command_get_output( 533 'mosys ec info | sed "s/.*| //"')[0] 534 535 def get_active_hash(self): 536 """Get hash of active EC RW firmware.""" 537 return self._os_if.run_shell_command_get_output( 538 'ectool echash | grep hash: | sed "s/hash:\s\+//"')[0] 539 540 def dump_whole(self, ec_path): 541 """Dump the current EC firmware to a file, specified by ec_path. 542 543 @param ec_path: The path of the EC image to be written. 544 """ 545 self._ec_handler.dump_whole(ec_path) 546 547 def write_whole(self, ec_path): 548 """Write the firmware from ec_path to the current system. 549 550 @param ec_path: The path of the source EC image. 551 """ 552 self._ec_handler.new_image(ec_path) 553 self._ec_handler.write_whole() 554 555 def corrupt_body(self, section): 556 """Corrupt the requested EC section body. 557 558 @param section: An EC section, either 'a' or 'b'. 559 """ 560 self._ec_handler.corrupt_firmware_body(section, corrupt_all=True) 561 562 def dump_firmware(self, ec_path): 563 """Dump the current EC firmware to a file, specified by ec_path. 564 565 @param ec_path: The path of the EC image to be written. 566 """ 567 self._ec_handler.dump_whole(ec_path) 568 569 def set_write_protect(self, enable): 570 """Enable write protect of the EC flash chip. 571 572 @param enable: True if activating EC write protect. Otherwise, False. 573 """ 574 if enable: 575 self._ec_handler.enable_write_protect() 576 else: 577 self._ec_handler.disable_write_protect() 578 579 def get_write_protect_status(self): 580 """Get a dict describing the status of the write protection 581 582 @return: {'enabled': True/False, 'start': '0x0', 'length': '0x0', ...} 583 @rtype: dict 584 """ 585 return self._ec_handler.get_write_protect_status() 586 587 def is_efs(self): 588 """Return True if the EC supports EFS.""" 589 return self._ec_handler.has_section_body('rw_b') 590 591 def copy_rw(self, from_section, to_section): 592 """Copy EC RW from from_section to to_section.""" 593 self._ec_handler.copy_from_to(from_section, to_section) 594 595 def reboot_to_switch_slot(self): 596 """Reboot EC to switch the active RW slot.""" 597 self._os_if.run_shell_command( 598 'ectool reboot_ec cold switch-slot', modifies_device=True) 599 600 def strip_modified_fwids(self): 601 """Strip trailing suffixes out of the FWIDs (see modify_image_fwids).""" 602 return self._ec_handler.strip_modified_fwids() 603 604 def get_write_cmd(self, image=None): 605 """Get the command needed to write the whole image to the device. 606 607 @param image: the filename (empty to use current handler data) 608 """ 609 if image: 610 # Don't bother loading the usual image, since it's overridden. 611 return self._real_ec_handler.get_write_cmd(image) 612 else: 613 return self._ec_handler.get_write_cmd() 614 615 616class KernelServicer(object): 617 """Class to service all Kernel RPCs""" 618 619 def __init__(self, os_if): 620 """ 621 @type os_if: os_interface.OSInterface 622 """ 623 self._os_if = os_if 624 self._real_kernel_handler = kernel_handler.KernelHandler(self._os_if) 625 626 @property 627 def _kernel_handler(self): 628 """Return the kernel handler, after initializing it if necessary 629 630 @rtype: kernel_handler.KernelHandler 631 """ 632 if not self._real_kernel_handler.initialized: 633 self._real_kernel_handler.init( 634 dev_key_path='/usr/share/vboot/devkeys', 635 internal_disk=True) 636 return self._real_kernel_handler 637 638 def corrupt_sig(self, section): 639 """Corrupt the requested kernel section. 640 641 @param section: A kernel section, either 'a' or 'b'. 642 """ 643 self._kernel_handler.corrupt_kernel(section) 644 645 def restore_sig(self, section): 646 """Restore the requested kernel section (previously corrupted). 647 648 @param section: A kernel section, either 'a' or 'b'. 649 """ 650 self._kernel_handler.restore_kernel(section) 651 652 def _modify_version(self, section, delta): 653 """Modify kernel version for the requested section, by adding delta. 654 655 The passed in delta, a positive or a negative number, is added to the 656 original kernel version. 657 """ 658 original_version = self._kernel_handler.get_version(section) 659 new_version = original_version + delta 660 self._os_if.log('Setting kernel section %s version from %d to %d' % 661 (section, original_version, new_version)) 662 self._kernel_handler.set_version(section, new_version) 663 664 def move_version_backward(self, section): 665 """Decrement kernel version for the requested section.""" 666 self._modify_version(section, -1) 667 668 def move_version_forward(self, section): 669 """Increase kernel version for the requested section.""" 670 self._modify_version(section, 1) 671 672 def get_version(self, section): 673 """Return kernel version.""" 674 return self._kernel_handler.get_version(section) 675 676 def get_datakey_version(self, section): 677 """Return kernel datakey version.""" 678 return self._kernel_handler.get_datakey_version(section) 679 680 def diff_a_b(self): 681 """Compare kernel A with B. 682 683 @return: True: if kernel A is different with B. 684 False: if kernel A is the same as B. 685 """ 686 rootdev = self._os_if.get_root_dev() 687 kernel_a = self._os_if.join_part(rootdev, '2') 688 kernel_b = self._os_if.join_part(rootdev, '4') 689 690 # The signature (some kind of hash) for the kernel body is stored in 691 # the beginning. So compare the first 64KB (including header, preamble, 692 # and signature) should be enough to check them identical. 693 header_a = self._os_if.read_partition(kernel_a, 0x10000) 694 header_b = self._os_if.read_partition(kernel_b, 0x10000) 695 696 return header_a != header_b 697 698 def resign_with_keys(self, section, key_path=None): 699 """Resign kernel with temporary key.""" 700 self._kernel_handler.resign_kernel(section, key_path) 701 702 def dump(self, section, kernel_path): 703 """Dump the specified kernel to a file. 704 705 @param section: The kernel to dump. May be A or B. 706 @param kernel_path: The path to the kernel image to be written. 707 """ 708 self._kernel_handler.dump_kernel(section, kernel_path) 709 710 def write(self, section, kernel_path): 711 """Write a kernel image to the specified section. 712 713 @param section: The kernel to dump. May be A or B. 714 @param kernel_path: The path to the kernel image. 715 """ 716 self._kernel_handler.write_kernel(section, kernel_path) 717 718 def get_sha(self, section): 719 """Return the SHA1 hash of the specified kernel section.""" 720 return self._kernel_handler.get_sha(section) 721 722 723class RootfsServicer(object): 724 """Class to service all RootFS RPCs""" 725 726 def __init__(self, os_if): 727 """ 728 @type os_if: os_interface.OSInterface 729 """ 730 self._os_if = os_if 731 self._real_rootfs_handler = rootfs_handler.RootfsHandler(self._os_if) 732 733 @property 734 def _rootfs_handler(self): 735 """Return the rootfs handler, after initializing it if necessary 736 737 @rtype: rootfs_handler.RootfsHandler 738 """ 739 if not self._real_rootfs_handler.initialized: 740 self._real_rootfs_handler.init() 741 return self._real_rootfs_handler 742 743 def verify_rootfs(self, section): 744 """Verifies the integrity of the root FS. 745 746 @param section: The rootfs to verify. May be A or B. 747 """ 748 return self._rootfs_handler.verify_rootfs(section) 749 750 751class RpcSettingsServicer(object): 752 """Class to service RPCs for settings of the RPC server itself""" 753 754 def __init__(self, os_if): 755 """ 756 @type os_if: os_interface.OSInterface 757 """ 758 self._os_if = os_if 759 760 def enable_test_mode(self): 761 """Enable test mode (avoids writing to flash or gpt)""" 762 self._os_if.test_mode = True 763 764 def disable_test_mode(self): 765 """Disable test mode and return to normal operation""" 766 self._os_if.test_mode = False 767 768 769class SystemServicer(object): 770 """Class to service all System RPCs""" 771 772 def __init__(self, os_if): 773 """ 774 @type os_if: os_interface.OSInterface 775 """ 776 self._os_if = os_if 777 self._key_checker = firmware_check_keys.firmwareCheckKeys() 778 779 def is_available(self): 780 """Function for polling the RPC server availability. 781 782 @return: Always True. 783 """ 784 return True 785 786 def dump_log(self, remove_log=False): 787 """Dump the log file. 788 789 @param remove_log: Remove the log file after dump. 790 @return: String of the log file content. 791 """ 792 return self._os_if.dump_log(remove_log=remove_log) 793 794 def run_shell_command(self, command, block=True): 795 """Run shell command. 796 797 @param command: A shell command to be run. 798 @param block: if True (default), wait for command to finish 799 """ 800 self._os_if.run_shell_command(command, block=block) 801 802 def run_shell_command_check_output(self, command, success_token): 803 """Run shell command and check its stdout for a string. 804 805 @param command: A shell command to be run. 806 @param success_token: A string to search the output for. 807 @return: A Boolean indicating whether the success_token was found in 808 the command output. 809 """ 810 return self._os_if.run_shell_command_check_output( 811 command, success_token) 812 813 def run_shell_command_get_output(self, command, include_stderr=False): 814 """Run shell command and get its console output. 815 816 @param command: A shell command to be run. 817 @return: A list of strings stripped of the newline characters. 818 """ 819 return self._os_if.run_shell_command_get_output(command, include_stderr) 820 821 def run_shell_command_get_status(self, command): 822 """Run shell command and get its console status. 823 824 @param command: A shell command to be run. 825 @return: The returncode of the process 826 @rtype: int 827 """ 828 return self._os_if.run_shell_command_get_status(command) 829 830 def get_platform_name(self): 831 """Get the platform name of the current system. 832 833 @return: A string of the platform name. 834 """ 835 return lsbrelease_utils.get_current_board() 836 837 def get_model_name(self): 838 """Get the model name of the current system. 839 840 @return: A string of the model name. 841 """ 842 model = cros_config.call_cros_config_get_output( 843 '/ name', self._os_if.run_shell_command_get_result) 844 if not model: 845 raise Exception('Failed getting model name from cros_config') 846 return model 847 848 def dev_tpm_present(self): 849 """Check if /dev/tpm0 is present. 850 851 @return: Boolean true or false. 852 """ 853 return os.path.exists('/dev/tpm0') 854 855 def get_crossystem_value(self, key): 856 """Get crossystem value of the requested key. 857 858 @param key: A crossystem key. 859 @return: A string of the requested crossystem value. 860 """ 861 return self._os_if.run_shell_command_get_output( 862 'crossystem %s' % key)[0] 863 864 def get_root_dev(self): 865 """Get the name of root device without partition number. 866 867 @return: A string of the root device without partition number. 868 """ 869 return self._os_if.get_root_dev() 870 871 def get_root_part(self): 872 """Get the name of root device with partition number. 873 874 @return: A string of the root device with partition number. 875 """ 876 return self._os_if.get_root_part() 877 878 def set_try_fw_b(self, count=1): 879 """Set 'Try Firmware B' flag in crossystem. 880 881 @param count: # times to try booting into FW B 882 """ 883 self._os_if.cs.fwb_tries = count 884 885 def set_fw_try_next(self, next, count=0): 886 """Set fw_try_next to A or B. 887 888 @param next: Next FW to reboot to (A or B) 889 @param count: # of times to try booting into FW <next> 890 """ 891 self._os_if.cs.fw_try_next = next 892 if count: 893 self._os_if.cs.fw_try_count = count 894 895 def get_fw_vboot2(self): 896 """Get fw_vboot2.""" 897 try: 898 return self._os_if.cs.fw_vboot2 == '1' 899 except os_interface.OSInterfaceError: 900 return False 901 902 def request_recovery_boot(self): 903 """Request running in recovery mode on the restart.""" 904 self._os_if.cs.request_recovery() 905 906 def get_dev_boot_usb(self): 907 """Get dev_boot_usb value which controls developer mode boot from USB. 908 909 @return: True if enable, False if disable. 910 """ 911 return self._os_if.cs.dev_boot_usb == '1' 912 913 def set_dev_boot_usb(self, value): 914 """Set dev_boot_usb value which controls developer mode boot from USB. 915 916 @param value: True to enable, False to disable. 917 """ 918 self._os_if.cs.dev_boot_usb = 1 if value else 0 919 920 def get_dev_default_boot(self): 921 """Get dev_default_boot value, which selects the default boot device. 922 923 @return: 'disk' or 'usb' or 'legacy' 924 """ 925 return self._os_if.cs.dev_default_boot 926 927 def set_dev_default_boot(self, device='disk'): 928 """Set dev_default_boot value, which selects the default boot device. 929 930 @param device: 'disk' or 'usb' or 'legacy' (default: 'disk') 931 """ 932 self._os_if.cs.dev_default_boot = device 933 934 def is_removable_device_boot(self): 935 """Check the current boot device is removable. 936 937 @return: True: if a removable device boots. 938 False: if a non-removable device boots. 939 """ 940 root_part = self._os_if.get_root_part() 941 return self._os_if.is_removable_device(root_part) 942 943 def get_internal_device(self): 944 """Get the internal disk by given the current disk.""" 945 root_part = self._os_if.get_root_part() 946 return self._os_if.get_internal_disk(root_part) 947 948 def create_temp_dir(self, prefix='backup_', dir=None): 949 """Create a temporary directory and return the path.""" 950 return tempfile.mkdtemp(prefix=prefix, dir=dir) 951 952 def remove_file(self, file_path): 953 """Remove the file.""" 954 return self._os_if.remove_file(file_path) 955 956 def remove_dir(self, dir_path): 957 """Remove the directory.""" 958 return self._os_if.remove_dir(dir_path) 959 960 def check_keys(self, expected_sequence): 961 """Check the keys sequence was as expected. 962 963 @param expected_sequence: A list of expected key sequences. 964 """ 965 return self._key_checker.check_keys(expected_sequence) 966 967 968class TpmServicer(object): 969 """Class to service all TPM RPCs""" 970 971 def __init__(self, os_if): 972 """ 973 @type os_if: os_interface.OSInterface 974 """ 975 self._os_if = os_if 976 977 # This attribute is accessed via a property, so it can load lazily 978 # when actually used by the test. 979 self._real_tpm_handler = tpm_handler.TpmHandler(self._os_if) 980 981 @property 982 def _tpm_handler(self): 983 """Handler for the TPM 984 985 @rtype: tpm_handler.TpmHandler 986 """ 987 if not self._real_tpm_handler.initialized: 988 self._real_tpm_handler.init() 989 return self._real_tpm_handler 990 991 def get_firmware_version(self): 992 """Retrieve tpm firmware body version.""" 993 return self._tpm_handler.get_fw_version() 994 995 def get_firmware_datakey_version(self): 996 """Retrieve tpm firmware data key version.""" 997 return self._tpm_handler.get_fw_key_version() 998 999 def get_kernel_version(self): 1000 """Retrieve tpm kernel body version.""" 1001 return self._tpm_handler.get_kernel_version() 1002 1003 def get_kernel_datakey_version(self): 1004 """Retrieve tpm kernel data key version.""" 1005 return self._tpm_handler.get_kernel_key_version() 1006 1007 def get_tpm_version(self): 1008 """Returns '1.2' or '2.0' as a string.""" 1009 # tpmc can return this without stopping daemons, so access real handler. 1010 return self._real_tpm_handler.get_tpm_version() 1011 1012 def stop_daemon(self): 1013 """Stop tpm related daemon.""" 1014 return self._tpm_handler.stop_daemon() 1015 1016 def restart_daemon(self): 1017 """Restart tpm related daemon which was stopped by stop_daemon().""" 1018 return self._tpm_handler.restart_daemon() 1019 1020 1021class UpdaterServicer(object): 1022 """Class to service all Updater RPCs""" 1023 1024 def __init__(self, os_if): 1025 """ 1026 @type os_if: os_interface.OSInterface 1027 """ 1028 self._os_if = os_if 1029 self._real_updater = firmware_updater.FirmwareUpdater(self._os_if) 1030 1031 @property 1032 def _updater(self): 1033 """Handler for the updater 1034 1035 @rtype: firmware_updater.FirmwareUpdater 1036 """ 1037 if not self._real_updater.initialized: 1038 self._real_updater.init() 1039 return self._real_updater 1040 1041 def cleanup(self): 1042 """Clean up the temporary directory""" 1043 # Use the updater directly, to avoid initializing it just to clean it up 1044 self._real_updater.cleanup_temp_dir() 1045 1046 def stop_daemon(self): 1047 """Stop update-engine daemon.""" 1048 return self._real_updater.stop_daemon() 1049 1050 def start_daemon(self): 1051 """Start update-engine daemon.""" 1052 return self._real_updater.start_daemon() 1053 1054 def get_section_fwid(self, target='bios', section=None): 1055 """Retrieve shellball's RW or RO fwid.""" 1056 return self._updater.get_section_fwid(target, section) 1057 1058 def get_device_fwids(self, target='bios'): 1059 """Retrieve flash device's fwids for the target.""" 1060 return self._updater.get_device_fwids(target) 1061 1062 def get_image_fwids(self, target='bios', filename=None): 1063 """Retrieve image file's fwids for the target.""" 1064 return self._updater.get_image_fwids(target, filename) 1065 1066 def modify_image_fwids(self, target='bios', sections=None): 1067 """Modify the fwid in the image, but don't flash it.""" 1068 return self._updater.modify_image_fwids(target, sections) 1069 1070 def modify_ecid_and_flash_to_bios(self): 1071 """Modify ecid, put it to AP firmware, and flash it to the system.""" 1072 self._updater.modify_ecid_and_flash_to_bios() 1073 1074 def corrupt_diagnostics_image(self, local_filename): 1075 """Corrupts a diagnostics image in the CBFS working directory. 1076 1077 @param local_filename: Filename for storing the diagnostics image in the 1078 CBFS working directory 1079 """ 1080 self._updater.corrupt_diagnostics_image(local_filename) 1081 1082 def get_ec_hash(self): 1083 """Return the hex string of the EC hash.""" 1084 blob = self._updater.get_ec_hash() 1085 # Format it to a hex string 1086 return binascii.hexlify(blob) 1087 1088 def resign_firmware(self, version): 1089 """Resign firmware with version. 1090 1091 @param version: new version number. 1092 """ 1093 self._updater.resign_firmware(version) 1094 1095 def extract_shellball(self, append=None): 1096 """Extract shellball with the given append suffix. 1097 1098 @param append: use for the shellball name. 1099 """ 1100 return self._updater.extract_shellball(append) 1101 1102 def repack_shellball(self, append=None): 1103 """Repack shellball with new fwid. 1104 1105 @param append: use for the shellball name. 1106 """ 1107 return self._updater.repack_shellball(append) 1108 1109 def reset_shellball(self): 1110 """Revert to the stock shellball""" 1111 self._updater.reset_shellball() 1112 1113 def reload_images(self): 1114 """Reload handlers from the on-disk images, in case they've changed.""" 1115 self._updater.reload_images() 1116 1117 def get_firmwareupdate_command(self, mode, append=None, options=()): 1118 """Get the command needed to run updater with the given options. 1119 1120 The client should run it via ssh, in case the update resets USB network. 1121 1122 @param mode: mode for the updater 1123 @param append: extra string appended to shellball filename to run 1124 @param options: options for chromeos-firmwareupdate 1125 @return: returncode of the updater 1126 @rtype: str 1127 """ 1128 return self._updater.get_firmwareupdate_command(mode, append, options) 1129 1130 def run_firmwareupdate(self, mode, append=None, options=()): 1131 """Run updater with the given options 1132 1133 @param mode: mode for the updater 1134 @param append: extra string appended to shellball filename to run 1135 @param options: options for chromeos-firmwareupdate 1136 @return: returncode of the updater 1137 @rtype: int 1138 """ 1139 return self._updater.run_firmwareupdate(mode, append, options) 1140 1141 def run_autoupdate(self, append): 1142 """Run chromeos-firmwareupdate with autoupdate mode.""" 1143 options = ['--noupdate_ec', '--wp=1'] 1144 self._updater.run_firmwareupdate( 1145 mode='autoupdate', append=append, options=options) 1146 1147 def run_factory_install(self): 1148 """Run chromeos-firmwareupdate with factory_install mode.""" 1149 options = ['--noupdate_ec', '--wp=0'] 1150 self._updater.run_firmwareupdate( 1151 mode='factory_install', options=options) 1152 1153 def run_bootok(self, append): 1154 """Run chromeos-firmwareupdate with bootok mode.""" 1155 self._updater.run_firmwareupdate(mode='bootok', append=append) 1156 1157 def run_recovery(self): 1158 """Run chromeos-firmwareupdate with recovery mode.""" 1159 options = ['--noupdate_ec', '--nocheck_keys', '--force', '--wp=1'] 1160 self._updater.run_firmwareupdate(mode='recovery', options=options) 1161 1162 def cbfs_setup_work_dir(self): 1163 """Sets up cbfstool work directory.""" 1164 return self._updater.cbfs_setup_work_dir() 1165 1166 def cbfs_extract_chip(self, 1167 fw_name, 1168 extension='.bin', 1169 hash_extension='.hash'): 1170 """Runs cbfstool to extract chip firmware. 1171 1172 @param fw_name: Name of chip firmware to extract. 1173 @return: Boolean success status. 1174 """ 1175 return self._updater.cbfs_extract_chip(fw_name, extension, 1176 hash_extension) 1177 1178 def cbfs_extract_diagnostics(self, diag_name, local_filename): 1179 """Runs cbfstool to extract a diagnostics image. 1180 1181 @param diag_name: Name of the diagnostics image in CBFS 1182 @param local_filename: Filename for storing the diagnostics image in the 1183 CBFS working directory 1184 """ 1185 self._updater.cbfs_extract_diagnostics(diag_name, local_filename) 1186 1187 def cbfs_replace_diagnostics(self, diag_name, local_filename): 1188 """Runs cbfstool to replace a diagnostics image in the firmware image. 1189 1190 @param diag_name: Name of the diagnostics image in CBFS 1191 @param local_filename: Filename for storing the diagnostics image in the 1192 CBFS working directory 1193 """ 1194 self._updater.cbfs_replace_diagnostics(diag_name, local_filename) 1195 1196 def cbfs_get_chip_hash(self, fw_name, hash_extension='.hash'): 1197 """Gets the chip firmware hash blob. 1198 1199 The hash data is returned as a list of stringified two-byte pieces: 1200 \x12\x34...\xab\xcd\xef -> ['0x12', '0x34', ..., '0xab', '0xcd', '0xef'] 1201 1202 @param fw_name: Name of chip firmware whose hash blob to return. 1203 @return: Hex string of hash blob. 1204 """ 1205 return self._updater.cbfs_get_chip_hash(fw_name, hash_extension) 1206 1207 def cbfs_replace_chip(self, 1208 fw_name, 1209 extension='.bin', 1210 hash_extension='.hash', 1211 regions=('a', 'b')): 1212 """Runs cbfstool to replace chip firmware. 1213 1214 @param fw_name: Name of chip firmware to extract. 1215 @return: Boolean success status. 1216 """ 1217 return self._updater.cbfs_replace_chip(fw_name, extension, 1218 hash_extension, regions) 1219 1220 def cbfs_sign_and_flash(self): 1221 """Runs cbfs signer and flash it. 1222 1223 @param fw_name: Name of chip firmware to extract. 1224 @return: Boolean success status. 1225 """ 1226 return self._updater.cbfs_sign_and_flash() 1227 1228 def cbfs_extract(self, 1229 filename, 1230 extension, 1231 regions, 1232 local_filename=None, 1233 arch=None, 1234 bios=None): 1235 """Extracts an arbitrary file from cbfs. 1236 1237 Note that extracting from 1238 @param filename: Filename in cbfs, including extension 1239 @param extension: Extension of the file, including '.' 1240 @param regions: Tuple of regions (the default is just 'a') 1241 @param arch: Specific machine architecture to extract (default unset) 1242 @param local_filename: Path to use on the DUT, overriding the default in 1243 the cbfs work dir. 1244 @param bios: Image from which the cbfs file to be extracted 1245 @return: The full path of the extracted file, or None 1246 """ 1247 return self._updater.cbfs_extract(filename, 1248 extension, regions, 1249 local_filename, 1250 arch, 1251 bios) 1252 1253 def get_temp_path(self): 1254 """Get updater's temp directory path.""" 1255 return self._updater.get_temp_path() 1256 1257 def get_keys_path(self): 1258 """Get updater's keys directory path.""" 1259 return self._updater.get_keys_path() 1260 1261 def get_work_path(self): 1262 """Get updater's work directory path.""" 1263 return self._updater.get_work_path() 1264 1265 def get_bios_relative_path(self): 1266 """Gets the relative path of the bios image in the shellball.""" 1267 return self._updater.get_bios_relative_path() 1268 1269 def get_ec_relative_path(self): 1270 """Gets the relative path of the ec image in the shellball.""" 1271 return self._updater.get_ec_relative_path() 1272 1273 def copy_bios(self, filename): 1274 """Make a copy of the shellball bios.bin""" 1275 return self._updater.copy_bios(filename) 1276 1277 def get_image_gbb_flags(self, filename=None): 1278 """Get the GBB flags in the given image (shellball image if unspecified) 1279 1280 @param filename: the image path to act on (None to use shellball image) 1281 @return: An integer of the GBB flags. 1282 """ 1283 return self._updater.get_image_gbb_flags(filename) 1284 1285 def set_image_gbb_flags(self, flags, filename=None): 1286 """Set the GBB flags in the given image (shellball image if unspecified) 1287 1288 @param flags: the flags to set 1289 @param filename: the image path to act on (None to use shellball image) 1290 1291 @type flags: int 1292 @type filename: str | None 1293 """ 1294 return self._updater.set_image_gbb_flags(flags, filename) 1295