# Copyright 2017 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import argparse import logging import os import re from autotest_lib.client.common_lib import error RO = 'ro' RW = 'rw' CR50_FILE = '/opt/google/cr50/firmware/cr50.bin.prod' CR50_STATE = '/var/cache/cr50*' GET_CR50_VERSION = 'cat /var/cache/cr50-version' GET_CR50_MESSAGES ='grep "cr50-.*\[" /var/log/messages' UPDATE_FAILURE = 'unexpected cr50-update exit code' DUMMY_VER = '-1.-1.-1' # This dictionary is used to search the usb_updater output for the version # strings. There are two usb_updater commands that will return versions: # 'fwver' and 'binvers'. # # 'fwver' is used to get the running RO and RW versions from cr50 # 'binvers' gets the version strings for each RO and RW region in the given # file # # The value in the dictionary is the regular expression that can be used to # find the version strings for each region. VERSION_RE = { "--fwver" : '\nRO (?P\S+).*\nRW (?P\S+)', "--binvers" : 'RO_A:(?P\S+).*RW_A:(?P\S+).*' \ 'RO_B:(?P\S+).*RW_B:(?P\S+)', } UPDATE_TIMEOUT = 60 UPDATE_OK = 1 ERASED_BID_INT = 0xffffffff # With an erased bid, the flags and board id will both be erased ERASED_BID = (ERASED_BID_INT, ERASED_BID_INT) usb_update = argparse.ArgumentParser() # use /dev/tpm0 to send the command usb_update.add_argument('-s', '--systemdev', dest='systemdev', action='store_true') # fwver, binver, and board id are used to get information about cr50 or an # image. usb_update.add_argument('-b', '--binvers', '-f', '--fwver', '-i', '--board_id', dest='info_cmd', action='store_true') # upstart and post_reset will post resets instead of rebooting immediately usb_update.add_argument('-u', '--upstart', '-p', '--post_reset', dest='post_reset', action='store_true') usb_update.add_argument('extras', nargs=argparse.REMAINDER) def AssertVersionsAreEqual(name_a, ver_a, name_b, ver_b): """Raise an error ver_a isn't the same as ver_b Args: name_a: the name of section a ver_a: the version string for section a name_b: the name of section b ver_b: the version string for section b Raises: AssertionError if ver_a is not equal to ver_b """ assert ver_a == ver_b, ("Versions do not match: %s %s %s %s" % (name_a, ver_a, name_b, ver_b)) def GetNewestVersion(ver_a, ver_b): """Compare the versions. Return the newest one. If they are the same return None.""" a = [int(x) for x in ver_a.split('.')] b = [int(x) for x in ver_b.split('.')] if a > b: return ver_a if b > a: return ver_b return None def GetVersion(versions, name): """Return the version string from the dictionary. Get the version for each key in the versions dictionary that contains the substring name. Make sure all of the versions match and return the version string. Raise an error if the versions don't match. Args: version: dictionary with the partition names as keys and the partition version strings as values. name: the string used to find the relevant items in versions. Returns: the version from versions or "-1.-1.-1" if an invalid RO was detected. """ ver = None key = None for k, v in versions.iteritems(): if name in k: if v == DUMMY_VER: logging.info("Detected invalid %s %s", name, v) return v elif ver: AssertVersionsAreEqual(key, ver, k, v) else: ver = v key = k return ver def FindVersion(output, arg): """Find the ro and rw versions. Args: output: The string to search arg: string representing the usb_updater option, either '--binvers' or '--fwver' Returns: a tuple of the ro and rw versions """ versions = re.search(VERSION_RE[arg], output) versions = versions.groupdict() ro = GetVersion(versions, RO) rw = GetVersion(versions, RW) return ro, rw def GetSavedVersion(client): """Return the saved version from /var/cache/cr50-version""" result = client.run(GET_CR50_VERSION).stdout.strip() return FindVersion(result, "--fwver") def UsbUpdater(client, args): """Run usb_update with the given args. Args: client: the object to run commands on args: a list of strings that contiain the usb_updater args Returns: the result of usb_update """ options = usb_update.parse_args(args) result = client.run("status trunksd") if options.systemdev and 'running' in result.stdout: client.run("stop trunksd") # If we are updating the cr50 image, usb_update will return a non-zero exit # status so we should ignore it. ignore_status = not options.info_cmd # immediate reboots are only honored if the command is sent using /dev/tpm0 expect_reboot = (options.systemdev and not options.post_reset and not options.info_cmd) result = client.run("usb_updater %s" % ' '.join(args), ignore_status=ignore_status, ignore_timeout=expect_reboot, timeout=UPDATE_TIMEOUT) # After a posted reboot, the usb_update exit code should equal 1. if result.exit_status and result.exit_status != UPDATE_OK: logging.debug(result) raise error.TestFail("Unexpected usb_update exit code after %s %d" % (' '.join(args), result.exit_status)) return result def GetVersionFromUpdater(client, args): """Return the version from usb_updater""" result = UsbUpdater(client, args).stdout.strip() return FindVersion(result, args[0]) def GetFwVersion(client): """Get the running version using 'usb_updater --fwver'""" return GetVersionFromUpdater(client, ['--fwver', '-s']) def GetBinVersion(client, image=CR50_FILE): """Get the image version using 'usb_updater --binvers image'""" # TODO(mruthven) b/37958867: change to ["--binvers", image] when usb_updater # is fixed return GetVersionFromUpdater(client, ['--binvers', image, image, '-s']) def GetVersionString(ver): return 'RO %s RW %s' % (ver[0], ver[1]) def GetRunningVersion(client): """Get the running Cr50 version. The version from usb_updater and /var/cache/cr50-version should be the same. Get both versions and make sure they match. Args: client: the object to run commands on Returns: running_ver: a tuple with the ro and rw version strings Raises: TestFail - If the version in /var/cache/cr50-version is not the same as the version from 'usb_updater --fwver' """ running_ver = GetFwVersion(client) saved_ver = GetSavedVersion(client) AssertVersionsAreEqual("Running", GetVersionString(running_ver), "Saved", GetVersionString(saved_ver)) return running_ver def CheckForFailures(client, last_message): """Check for any unexpected cr50-update exit codes. This only checks the cr50 update messages that have happened since last_message. If a unexpected exit code is detected it will raise an error> Args: client: the object to run commands on last_message: the last cr50 message from the last update run Returns: the last cr50 message in /var/log/messages Raises: TestFail - If there is a unexpected cr50-update exit code after last_message in /var/log/messages """ messages = client.run(GET_CR50_MESSAGES).stdout.strip() if last_message: messages = messages.rsplit(last_message, 1)[-1].split('\n') failures = [] for message in messages: if UPDATE_FAILURE in message: failures.append(message) if len(failures): logging.info(messages) raise error.TestFail("Detected unexpected exit code during update: " "%s" % failures) return messages[-1] def VerifyUpdate(client, ver='', last_message=''): """Verify that the saved update state is correct and there were no unexpected cr50-update exit codes since the last update. Args: client: the object to run commands on ver: the expected version tuple (ro ver, rw ver) last_message: the last cr50 message from the last update run Returns: new_ver: a tuple containing the running ro and rw versions last_message: The last cr50 update message in /var/log/messages """ # Check that there were no unexpected reboots from cr50-result last_message = CheckForFailures(client, last_message) logging.debug("last cr50 message %s", last_message) new_ver = GetRunningVersion(client) if ver != '': if DUMMY_VER != ver[0]: AssertVersionsAreEqual("Old RO", ver[0], "Updated RO", new_ver[0]) AssertVersionsAreEqual("Old RW", ver[1], "Updated RW", new_ver[1]) return new_ver, last_message def ClearUpdateStateAndReboot(client): """Removes the cr50 status files in /var/cache and reboots the AP""" client.run("rm %s" % CR50_STATE) client.reboot() def InstallImage(client, src, dest=CR50_FILE): """Copy the image at src to dest on the dut Args: client: the object to run commands on src: the image location of the server dest: the desired location on the dut Returns: The filename where the image was copied to on the dut, a tuple containing the RO and RW version of the file """ # Send the file to the DUT client.send_file(src, dest) ver = GetBinVersion(client, dest) client.run("sync") return dest, ver def GetSymbolicBoardId(symbolic_board_id): """Convert the symbolic board id str to an int Args: symbolic_board_id: a ASCII string. It can be up to 4 characters Returns: the symbolic board id string converted to an int """ board_id = 0 for c in symbolic_board_id: board_id = ord(c) | (board_id << 8) return board_id def GetExpectedBoardId(board_id): """"Return the usb_updater interpretation of board_id Args: board_id: a int or string value of the board id Returns: a int representation of the board id """ if type(board_id) == int: return board_id if len(board_id) <= 4: return GetSymbolicBoardId(board_id) return int(board_id, 16) def GetExpectedFlags(flags): """If flags are not specified, usb_updater will set them to 0xff00 Args: flags: The int value or None Returns: the original flags or 0xff00 if flags is None """ return flags if flags != None else 0xff00 def GetBoardId(client): """Return the board id and flags Args: client: the object to run commands on Returns: a tuple with the hex value board id, flags Raises: TestFail if the second board id response field is not ~board_id """ result = UsbUpdater(client, ["-i"]).stdout.strip() board_id_info = result.split("Board ID space: ")[-1].strip().split(":") board_id, board_id_inv, flags = [int(val, 16) for val in board_id_info] logging.info('BOARD_ID: %x:%x:%x', board_id, board_id_inv, flags) if board_id == board_id_inv == flags == ERASED_BID_INT: logging.info('board id is erased') elif board_id & board_id_inv: raise error.TestFail('board_id_inv should be ~board_id got %x %x' % (board_id, board_id_inv)) return board_id, flags def CheckBoardId(client, board_id, flags): """Compare the given board_id and flags to the running board_id and flags Interpret board_id and flags how usb_updater would interpret them, then compare those interpreted values to the running board_id and flags. Args: client: the object to run commands on board_id: a hex, symbolic or int value for board_id flags: the int value of flags or None Raises: TestFail if the new board id info does not match """ # Read back the board id and flags new_board_id, new_flags = GetBoardId(client) expected_board_id = GetExpectedBoardId(board_id) expected_flags = GetExpectedFlags(flags) if new_board_id != expected_board_id or new_flags != expected_flags: raise error.TestFail('Failed to set board id expected %x:%x, but got ' '%x:%x' % (expected_board_id, expected_flags, new_board_id, new_flags)) def SetBoardId(client, board_id, flags=None): """Sets the board id and flags Args: client: the object to run commands on board_id: a string of the symbolic board id or board id hex value. If the string is less than 4 characters long it will be considered a symbolic value flags: the desired flag value. If board_id is a symbolic value, then this will be ignored. Raises: TestFail if we were unable to set the flags to the correct value """ board_id_arg = board_id if flags != None: board_id_arg += ':' + hex(flags) # Set the board id using the given board id and flags result = UsbUpdater(client, ["-s", "-i", board_id_arg]).stdout.strip() CheckBoardId(client, board_id, flags)