# Lint as: python2, python3 # Copyright 2017 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. from __future__ import absolute_import from __future__ import division from __future__ import print_function import base64 import json import logging import os import re import shutil import time from six.moves import zip from six.moves import zip_longest import six.moves.urllib.parse from datetime import datetime, timedelta from xml.etree import ElementTree from autotest_lib.client.common_lib import autotemp from autotest_lib.client.common_lib import autotest_enum from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib import global_config from autotest_lib.client.common_lib import lsbrelease_utils from autotest_lib.client.common_lib import utils from autotest_lib.client.common_lib.cros import dev_server from autotest_lib.client.cros import constants from autotest_lib.client.cros.update_engine import dlc_util from autotest_lib.client.cros.update_engine import update_engine_event as uee from autotest_lib.client.cros.update_engine import update_engine_util from autotest_lib.server import autotest from autotest_lib.server import test from autotest_lib.server.cros import gsutil_wrapper from autotest_lib.server.cros.dynamic_suite import tools from autotest_lib.utils.frozen_chromite.lib import auto_updater from autotest_lib.utils.frozen_chromite.lib import auto_updater_transfer from autotest_lib.utils.frozen_chromite.lib import constants as chromite_constants from autotest_lib.utils.frozen_chromite.lib import gob_util from autotest_lib.utils.frozen_chromite.lib import osutils from autotest_lib.utils.frozen_chromite.lib import remote_access from autotest_lib.utils.frozen_chromite.lib import retry_util class UpdateEngineTest(test.test, update_engine_util.UpdateEngineUtil): """Base class for all autoupdate_ server tests. Contains useful functions shared between tests like staging payloads on devservers, verifying hostlogs, and launching client tests. """ version = 1 # Timeout periods, given in seconds. _INITIAL_CHECK_TIMEOUT = 12 * 60 _DOWNLOAD_STARTED_TIMEOUT = 4 * 60 _DOWNLOAD_FINISHED_TIMEOUT = 60 * 60 _UPDATE_COMPLETED_TIMEOUT = 8 * 60 _POST_REBOOT_TIMEOUT = 15 * 60 # Name of the logfile generated by nebraska.py. _NEBRASKA_LOG = 'nebraska.log' # Version we tell the DUT it is on before update. _CUSTOM_LSB_VERSION = '0.0.0.0' _CELLULAR_BUCKET = 'gs://chromeos-throw-away-bucket/CrOSPayloads/Cellular/' _TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S' # Paygen.json file provides information about all builds on all channels. _PAYGEN_JSON_URI = 'gs://chromeos-build-release-console/paygen.json' # Subtest to use for logging into DUTs in a test. _LOGIN_TEST = 'login_LoginSuccess' _LOGIN_TEST_PIN = 'login_LoginPin' _CORRUPT_STATEFUL_PATH = '/mnt/stateful_partition/.corrupt_stateful' _STATEFUL_ARCHIVE_NAME = 'stateful.tgz' _KERNEL_ARCHIVE_NAME = 'full_dev_part_KERN.bin.gz' _ROOTFS_ARCHIVE_NAME = 'full_dev_part_ROOT.bin.gz' _PAYLOAD_TYPE = autotest_enum.AutotestEnum('CROS', 'DLC', 'MINIOS') def initialize(self, host=None): """ Sets default variables for the test. @param host: The DUT we will be running on. """ self._current_timestamp = None self._host = host # Define functions used in update_engine_util. self._run = self._host.run if self._host else None self._get_file = self._host.get_file if self._host else None # Utilities for DLC management self._dlc_util = dlc_util.DLCUtil(self._run) # URL pointing to the autotest package on a lab cache server. It is # used by lab runs to select the right board+build when finding the # update payloads. The cache server will also be used for downloading # the update. self._job_repo_url = None # The target build for the update. Uses the release builder path # format, ex: octopus-release/R102-14650.0.0 self._build = None # Flag to indicate that the test has progressed far enough that # stateful should be restored on failure. self._should_restore_stateful = False self._autotest_devserver = None def cleanup(self): """Clean up update_engine autotests.""" if self._host: self._host.get_file(self._UPDATE_ENGINE_LOG, self.resultsdir) def _get_release_builder_path(self): """ Returns the release builder path currently provisioned on the device which can be used to get the current board and build number. Ex: octopus-release/R102-14650.0.0 """ lsb_release_content = self._run(['cat', constants.LSB_RELEASE]).stdout builder_path = lsbrelease_utils.get_chromeos_release_builder_path( lsb_release_content) logging.info("Current release builder path on the DUT is %s", builder_path) return builder_path def _get_expected_events_for_rootfs_update(self, source_release): """ Creates a list of expected events fired during a rootfs update. There are 4 events fired during a rootfs update. We will create these in the correct order. @param source_release: The source build version. """ return [ uee.UpdateEngineEvent( version=source_release, timeout=self._INITIAL_CHECK_TIMEOUT), uee.UpdateEngineEvent( event_type=uee.EVENT_TYPE_DOWNLOAD_STARTED, event_result=uee.EVENT_RESULT_SUCCESS, version=source_release, timeout=self._DOWNLOAD_STARTED_TIMEOUT), uee.UpdateEngineEvent( event_type=uee.EVENT_TYPE_DOWNLOAD_FINISHED, event_result=uee.EVENT_RESULT_SUCCESS, version=source_release, timeout=self._DOWNLOAD_FINISHED_TIMEOUT), uee.UpdateEngineEvent( event_type=uee.EVENT_TYPE_UPDATE_COMPLETE, event_result=uee.EVENT_RESULT_SUCCESS, version=source_release, timeout=self._UPDATE_COMPLETED_TIMEOUT) ] def _get_expected_event_for_post_reboot_check(self, source_release, target_release): """ Creates the expected event fired during post-reboot update check. @param source_release: The source build version. @param target_release: The target build version. """ return [ uee.UpdateEngineEvent( event_type=uee.EVENT_TYPE_REBOOTED_AFTER_UPDATE, event_result=uee.EVENT_RESULT_SUCCESS, version=target_release, previous_version=source_release, timeout = self._POST_REBOOT_TIMEOUT) ] def _verify_event_with_timeout(self, expected_event, actual_event): """ Verify an expected event occurred before its timeout. @param expected_event: an expected event. @param actual_event: an actual event from the hostlog. @return None if event complies, an error string otherwise. """ logging.info('Expecting %s within %s seconds', expected_event, expected_event._timeout) if not actual_event: return ('No entry found for %s event.' % uee.get_event_type (expected_event._expected_attrs['event_type'])) logging.info('Consumed new event: %s', actual_event) # If this is the first event, set it as the current time if self._current_timestamp is None: self._current_timestamp = datetime.strptime( actual_event['timestamp'], self._TIMESTAMP_FORMAT) # Get the time stamp for the current event and convert to datetime timestamp = actual_event['timestamp'] event_timestamp = datetime.strptime(timestamp, self._TIMESTAMP_FORMAT) # If the event happened before the timeout difference = event_timestamp - self._current_timestamp # We check if the difference > 7 hs. If so we assume that this is due to # timezone jump from local to utc caused by log format change # crrev.com/c/2652108 and adjust accordingly. Another assumption here is # that the DUT are in PST timezone. The jump will be 7 hs with DST and 8 # hs without. This hack should be removed once reasonable time has # passed and we do not need to consider the old log format anymore. # TODO(crbug.com/c/1178930): Remove the hack below. if difference > timedelta(hours=7): logging.info( 'Detected a timezone jump with difference %s with event %s', difference, uee.get_event_type( expected_event._expected_attrs['event_type'])) if difference > timedelta(hours=8): difference -= timedelta(hours=8) else: difference -= timedelta(hours=7) if difference < timedelta(seconds=expected_event._timeout): logging.info('Event took %s seconds to fire during the ' 'update', difference.seconds) self._current_timestamp = event_timestamp mismatched_attrs = expected_event.equals(actual_event) if mismatched_attrs is None: return None else: return self._error_incorrect_event( expected_event, actual_event, mismatched_attrs) else: return self._timeout_error_message(expected_event, difference.seconds) def _error_incorrect_event(self, expected, actual, mismatched_attrs): """ Error message for when an event is not what we expect. @param expected: The expected event that did not match the hostlog. @param actual: The actual event with the mismatched arg(s). @param mismatched_attrs: A list of mismatched attributes. """ et = uee.get_event_type(expected._expected_attrs['event_type']) return ('Event %s had mismatched attributes: %s. We expected %s, but ' 'got %s.' % (et, mismatched_attrs, expected, actual)) def _timeout_error_message(self, expected, time_taken): """ Error message for when an event takes too long to fire. @param expected: The expected event that timed out. @param time_taken: How long it actually took. """ et = uee.get_event_type(expected._expected_attrs['event_type']) return ('Event %s should take less than %ds. It took %ds.' % (et, expected._timeout, time_taken)) def _stage_payload_by_uri(self, payload_uri, properties_file=True): """Stage a payload based on its GS URI. This infers the build's label, filename and GS archive from the provided GS URI. @param payload_uri: The full GS URI of the payload. @param properties_file: If true, it will stage the update payload properties file too. @return URL of the staged payload (and properties file) on the server. @raise error.TestError if there's a problem with staging. """ archive_url, _, filename = payload_uri.rpartition('/') build_name = six.moves.urllib.parse.urlsplit(archive_url).path.strip( '/') filenames = [filename] if properties_file: filenames.append(filename + '.json') try: if self._autotest_devserver is None: self._autotest_devserver = dev_server.ImageServer.resolve( build_name, self._host.hostname) self._autotest_devserver.stage_artifacts(image=build_name, files=filenames, archive_url=archive_url) return (self._autotest_devserver.get_staged_file_url(f, build_name) for f in filenames) except dev_server.DevServerException as e: raise error.TestError('Failed to stage payload: %s' % e) def _get_devserver_for_test(self, test_conf): """Find a devserver to use. We use the payload URI as the hash for ImageServer.resolve. The chosen devserver needs to respect the location of the host if 'prefer_local_devserver' is set to True or 'restricted_subnets' is set. @param test_conf: a dictionary of test settings. """ autotest_devserver = dev_server.ImageServer.resolve( test_conf['target_payload_uri'], self._host.hostname) devserver_hostname = six.moves.urllib.parse.urlparse( autotest_devserver.url()).hostname logging.info('Devserver chosen for this run: %s', devserver_hostname) return autotest_devserver def _get_payload_url(self, full_payload=True, payload_type=_PAYLOAD_TYPE.CROS): """ Gets the GStorage URL of the full or delta payload for the target update version for either platform or DLC payloads. @param full_payload: True for full payload. False for delta. @param payload_type: The type of payload to get. Can be a value of the _PAYLOAD_TYPE enum. @returns the payload URL. For example, a full payload URL looks like: gs://chromeos-image-archive/octopus-release/R102-14650.0.0/chromeos_R102-14650.0.0_octopus_full_dev.bin """ image_path = global_config.global_config.get_config_value( 'CROS', 'image_storage_server', type=str) # This forces a trailing '/'' if not there yet. gs = os.path.join(image_path, '') # Example payload names (AU): # chromeos_R85-13265.0.0_eve_full_dev.bin # chromeos_R85-13265.0.0_R85-13265.0.0_eve_delta_dev.bin # Example payload names (DLC): # dlc_sample-dlc_package_R85-13265.0.0_eve_full_dev.bin # dlc_sample-dlc_package_R85-13265.0.0_R85-13265.0.0_eve_delta_dev.bin # Example payload names (MiniOS): # minios_R102-14667.0.0_guybrush_full_dev.bin # minios_R102-14667.0.0_R102-14667.0.0_guybrush_delta_dev.bin if payload_type is self._PAYLOAD_TYPE.DLC: payload_prefix = 'dlc_*_%s_*.bin' elif payload_type is self._PAYLOAD_TYPE.MINIOS: payload_prefix = 'minios_*_%s_*.bin' else: payload_prefix = 'chromeos_*_%s_*.bin' regex = payload_prefix % ('full' if full_payload else 'delta') payload_url_regex = gs + self._build + '/' + regex logging.debug('Trying to find payloads at %s', payload_url_regex) payloads = utils.gs_ls(payload_url_regex) if not payloads: raise error.TestFail('Could not find payload for %s', self._build) logging.debug('Payloads found: %s', payloads) return payloads[0] @staticmethod def _get_stateful_uri(build_uri): """Returns a complete GS URI of a stateful update given a build path.""" return '/'.join([build_uri.rstrip('/'), 'stateful.tgz']) def _get_job_repo_url(self, job_repo_url=None): """Gets the job_repo_url argument supplied to the test by the lab.""" if job_repo_url is not None: return job_repo_url if self._host is None: raise error.TestFail('No host specified by AU test.') info = self._host.host_info_store.get() return info.attributes.get(self._host.job_repo_url_attribute, '') def _stage_payloads(self, payload_uri, archive_uri): """ Stages payloads on the devserver. @param payload_uri: URI for a GS payload to stage. @param archive_uri: URI for GS folder containing payloads. This is used to find the related stateful payload. @returns URI of staged payload, URI of staged stateful. """ if not payload_uri: return None, None staged_uri, _ = self._stage_payload_by_uri(payload_uri) logging.info('Staged %s at %s.', payload_uri, staged_uri) # Figure out where to get the matching stateful payload. if archive_uri: stateful_uri = self._get_stateful_uri(archive_uri) else: stateful_uri = self._payload_to_stateful_uri(payload_uri) staged_stateful = self._stage_payload_by_uri(stateful_uri, properties_file=False) logging.info('Staged stateful from %s at %s.', stateful_uri, staged_stateful) return staged_uri, staged_stateful def _payload_to_stateful_uri(self, payload_uri): """Given a payload GS URI, returns the corresponding stateful URI.""" build_uri = payload_uri.rpartition('/payloads/')[0] return self._get_stateful_uri(build_uri) def _copy_payload_to_public_bucket(self, payload_url, use_globbing=True, destination_filename=None): """ Copy payload and make link public (if not already there). @param payload_url: Payload URL on Google Storage. @param use_globbing: Use globbing with payload url as prefix. @param destination_filename: Filename of payload on public bucket if it should be different from the source filename. Note that gsutil will treat this as destination directory if `use_globbing` is true and resolves to multiple files. @returns The payload URL that is now publicly accessible. """ payload_filename = payload_url.rpartition('/')[2] if destination_filename: payload_filename = destination_filename new_gs_url = self._CELLULAR_BUCKET + payload_filename public_url = new_gs_url.replace('gs://', 'https://storage.googleapis.com/') src_url = '%s*' % payload_url if use_globbing else payload_url dst_url = new_gs_url if destination_filename else self._CELLULAR_BUCKET # Check if public bucket already has the payload. try: payloads = utils.gs_ls(new_gs_url) if payloads: logging.info( 'Payload already exists in public bucket. Returning ' 'url to existing payload') return public_url except error.CmdError: logging.warning('No existing payload exists. Copying payload...') utils.run(['gsutil', 'cp', '-n', src_url, dst_url]) utils.run(['gsutil', 'acl', 'ch', '-u', 'AllUsers:R', '%s*' % new_gs_url]) return public_url def _suspend_then_resume(self): """Suspends and resumes the host DUT.""" try: self._host.suspend(suspend_time=30) except error.AutoservSuspendError: logging.exception('Suspend did not last the entire time.') def _run_client_test_and_check_result(self, test_name, **kwargs): """ Kicks of a client autotest and checks that it didn't fail. @param test_name: client test name @param **kwargs: key-value arguments to pass to the test. """ client_at = autotest.Autotest(self._host) client_at.run_test(test_name, **kwargs) client_at._check_client_test_result(self._host, test_name) def _extract_request_logs(self, update_engine_log, is_dlc=False): """ Extracts request logs from an update_engine log. @param update_engine_log: The update_engine log as a string. @param is_dlc: True to return the request logs for the DLC updates instead of the platform update. @returns a list object representing the platform (OS) request logs, or a dictionary of lists representing DLC request logs, keyed by DLC ID, if is_dlc is True. """ # Looking for all request XML strings in the log. pattern = re.compile(r'', re.DOTALL) requests = pattern.findall(update_engine_log) # We are looking for patterns like this: # "2021-01-28T10:14:33.998217Z INFO update_engine: \ # [omaha_request_action.cc(794)] Request: 0 utc_offset = timedelta( seconds=(time.altzone if is_dst else time.timezone)) if utc_offset > timedelta(seconds=0): logging.info('Parsing old log format. Adding utc offset of %s', utc_offset) else: logging.warning( 'Local time to UTC conversion might fail. utc_offset=%s. time.altzone=%s, time.timezone=%s', utc_offset, time.altzone, time.timezone) timestamps = [ # Just use the current year since the logs don't have the year # value. Let's all hope tests don't start to fail on new year's # eve LOL. datetime( datetime.now().year, int(ts[0][0:2]), # Month int(ts[0][2:4]), # Day int(ts[1][0:2]), # Hours int(ts[1][2:4]), # Minutes int(ts[1][4:6]) # Seconds ) + utc_offset for ts in timestamp_pattern_old.findall(update_engine_log) ] if len(requests) != len(timestamps): raise error.TestFail('Failed to properly parse the update_engine ' 'log file.') result = [] dlc_results = {} for timestamp, request in zip(timestamps, requests): root = ElementTree.fromstring(request) # There may be events for multiple apps if DLCs are installed. # See below (trimmed) example request including DLC: # # # # # # # # # # # # The first section is for the platform update. The second # is for the DLC update. # # Example without DLC: # # # # # # apps = root.findall('app') for app in apps: event = app.find('event') event_info = { 'version': app.attrib.get('version'), 'event_type': (int(event.attrib.get('eventtype')) if event is not None else None), 'event_result': (int(event.attrib.get('eventresult')) if event is not None else None), 'timestamp': timestamp.strftime(self._TIMESTAMP_FORMAT), } previous_version = (event.attrib.get('previousversion') if event is not None else None) if previous_version: event_info['previous_version'] = previous_version # Check if the event is for the platform update or for a DLC # by checking the appid. For platform, the appid looks like: # {DB5199C7-358B-4E1F-B4F6-AF6D2DD01A38} # For DLCs, it is the platform app ID + _ + the DLC ID: # {DB5199C7-358B-4E1F-B4F6-AF6D2DD01A38}_sample-dlc id_segments = app.attrib.get('appid').split('_') if len(id_segments) > 1: dlc_id = id_segments[1] if dlc_id in dlc_results: dlc_results[dlc_id].append(event_info) else: dlc_results[dlc_id] = [event_info] else: result.append(event_info) if is_dlc: logging.info('Extracted DLC request logs: %s', dlc_results) return dlc_results else: logging.info('Extracted platform (OS) request log: %s', result) return result def _create_hostlog_files(self, ignore_event_rootfs=False): """Create the two hostlog files for the update. To ensure the update was successful we need to compare the update events against expected update events. There is a hostlog for the rootfs update and for the post reboot update check. @param ignore_event_rootfs: Ignores the last event in the rootfs log. """ # Check that update logs exist for the update that just happened. if len(self._get_update_engine_logs()) < 2: err_msg = 'update_engine logs are missing. Cannot verify update.' raise error.TestFail(err_msg) # Each time we reboot in the middle of an update we ping omaha again # for each update event. So parse the list backwards to get the final # events. rootfs_hostlog = os.path.join(self.resultsdir, 'hostlog_rootfs') with open(rootfs_hostlog, 'w') as fp: # There are four expected hostlog events during update. extract_logs = self._extract_request_logs( self._get_update_engine_log(1)) if ignore_event_rootfs: logs = extract_logs[-5:-1] else: logs = extract_logs[-4:] json.dump(logs, fp) reboot_hostlog = os.path.join(self.resultsdir, 'hostlog_reboot') with open(reboot_hostlog, 'w') as fp: # There is one expected hostlog events after reboot. json.dump(self._extract_request_logs( self._get_update_engine_log(0))[:1], fp) return rootfs_hostlog, reboot_hostlog def _create_dlc_hostlog_files(self): """Create the rootfs and reboot hostlog files for DLC updates. Each DLC has its own set of update requests in the logs together with the platform update requests. To ensure the DLC update was successful we will compare the update events against the expected events, which are the same expected events as for the platform update. There is a hostlog for the rootfs update and the post-reboot update check for each DLC. @returns two dictionaries, one for the rootfs DLC update and one for the post-reboot check. The keys are DLC IDs and the values are the hostlog filenames. """ dlc_rootfs_hostlogs = {} dlc_reboot_hostlogs = {} dlc_rootfs_request_logs = self._extract_request_logs( self._get_update_engine_log(1), is_dlc=True) for dlc_id in dlc_rootfs_request_logs: dlc_rootfs_hostlog = os.path.join(self.resultsdir, 'hostlog_' + dlc_id) dlc_rootfs_hostlogs[dlc_id] = dlc_rootfs_hostlog with open(dlc_rootfs_hostlog, 'w') as fp: # Same number of events for DLC updates as for platform json.dump(dlc_rootfs_request_logs[dlc_id][-4:], fp) dlc_reboot_request_logs = self._extract_request_logs( self._get_update_engine_log(0), is_dlc=True) for dlc_id in dlc_reboot_request_logs: dlc_reboot_hostlog = os.path.join(self.resultsdir, 'hostlog_' + dlc_id + '_reboot') dlc_reboot_hostlogs[dlc_id] = dlc_reboot_hostlog with open(dlc_reboot_hostlog, 'w') as fp: # Same number of events for DLC updates as for platform json.dump(dlc_reboot_request_logs[dlc_id][:1], fp) return dlc_rootfs_hostlogs, dlc_reboot_hostlogs def _set_active_p2p_host(self, host): """ Choose which p2p host device to run commands on. For P2P tests with multiple DUTs we need to be able to choose which host within self._hosts we want to issue commands on. @param host: The host to run commands on. """ self._set_util_functions(host.run, host.get_file) def _set_update_over_cellular_setting(self, update_over_cellular=True): """ Toggles the update_over_cellular setting in update_engine. @param update_over_cellular: True to enable, False to disable. """ answer = 'yes' if update_over_cellular else 'no' cmd = [self._UPDATE_ENGINE_CLIENT_CMD, '--update_over_cellular=%s' % answer] retry_util.RetryException(error.AutoservRunError, 2, self._run, cmd) def _copy_generated_nebraska_logs(self, logs_dir, identifier): """Copies nebraska logs from logs_dir into job results directory. The nebraska process on the device generates logs and stores those logs in a /tmp directory. The update engine generates update_engine.log during the auto-update which is also stored in the same /tmp directory. This method copies these logfiles from the /tmp directory into the job @param logs_dir: Directory containing paths to the log files generated by the nebraska process. @param identifier: A string that is appended to the logfile when it is saved so that multiple files with the same name can be differentiated. """ partial_filename = '%s_%s_%s' % ('%s', self._host.hostname, identifier) src_files = [ self._NEBRASKA_LOG, os.path.basename(self._UPDATE_ENGINE_LOG), ] for src_fname in src_files: source = os.path.join(logs_dir, src_fname) dest = os.path.join(self.resultsdir, partial_filename % src_fname) logging.debug('Copying logs from %s to %s', source, dest) try: shutil.copyfile(source, dest) except Exception as e: logging.error('Could not copy logs from %s into %s due to ' 'exception: %s', source, dest, e) @staticmethod def _get_update_parameters_from_uri(payload_uri): """Extract vars needed to update with a Google Storage payload URI. The two values we need are: (1) A build_name string e.g dev-channel/samus/9583.0.0 (2) A filename of the exact payload file to use for the update. This payload needs to have already been staged on the devserver. @param payload_uri: Google Storage URI to extract values from """ # gs://chromeos-releases/dev-channel/samus/9334.0.0/payloads/blah.bin # build_name = dev-channel/samus/9334.0.0 # payload_file = payloads/blah.bin build_name = payload_uri[:payload_uri.index('payloads/')] build_name = six.moves.urllib.parse.urlsplit(build_name).path.strip( '/') payload_file = payload_uri[payload_uri.index('payloads/'):] logging.debug('Extracted build_name: %s, payload_file: %s from %s.', build_name, payload_file, payload_uri) return build_name, payload_file def _update_stateful(self): # Tries to update stateful without clobbering it. return self._restore_stateful(clobber_stateful=False) def _restore_stateful(self, clobber_stateful=True, public_bucket=False): """ Restore the stateful partition after a destructive test. @param clobber_stateful: True to wipe clobber user state. @param public_bucket: True to restore from a public bucket. """ # Test failed before any update preparations began. No need to fix stateful. if not self._should_restore_stateful: return # Fallback to lab provisioning if this function fails to restore. self._run(['touch', self._CORRUPT_STATEFUL_PATH]) # Stage stateful payload. statefuldev_url = self._stage_stateful(public_bucket) logging.info('Restoring stateful partition...') # Setup local dir. self._run(['mkdir', '-p', '-m', '1777', '/usr/local/tmp']) # Download and extract the stateful payload. try: self._download_and_extract_stateful(statefuldev_url, self._STATEFUL_MOUNT_DIR) except error.AutoservRunError as e: err_str = 'Failed to restore the stateful partition' raise error.TestFail('%s: %s' % (err_str, str(e))) # Touch a file so changes are picked up after reboot. update_file = '/mnt/stateful_partition/.update_available' if clobber_stateful: self._run(['echo', '-n', 'clobber', '>', update_file]) else: self._run(['touch', update_file]) self._host.reboot() # Make sure python is available again. try: self._run(['python', '--version']) except error.AutoservRunError as e: err_str = 'Python not available after restoring stateful.' raise error.TestFail(err_str) logging.info('Stateful restored successfully.') def _download_and_extract_stateful(self, stateful_url, destination, keep_symlinks=False, members=None): """ Download and extract the stateful partition. @param stateful_url: The url of the stateful archive. @param destination: The directory that the stateful archive will be extracted into. @param keep_symlinks: Don't overwrite symlinks in destination directory. @param members: When provided, they specify the names of the stateful archive members to be extracted else everything will be extracted. """ cmd = [ 'curl', '--silent', '--show-error', '--max-time', '600', stateful_url, '|', '/bin/tar', '--ignore-command-error', '--overwrite', '--directory', destination, '-xz' ] if keep_symlinks: cmd += ['--keep-directory-symlink'] if members: # Normalize members to be a list. if not isinstance(members, list): members = [members] cmd += members self._run(cmd) def _stage_stateful(self, public_bucket=False): """ Stage the stateful archive for download. @param public_bucket: True to return archive on a public bucket. """ if public_bucket: statefuldev_url = self._get_stateful_url_on_public_bucket() else: # Stage stateful payload. ds_url, build = tools.get_devserver_build_from_package_url( self._job_repo_url) self._autotest_devserver = dev_server.ImageServer(ds_url) self._autotest_devserver.stage_artifacts(build, ['stateful']) update_url = self._autotest_devserver.get_update_url(build) statefuldev_url = update_url.replace('update', 'static') statefuldev_url += '/stateful.tgz' return statefuldev_url def verify_update_events(self, source_release, hostlog_filename, target_release=None): """Compares a hostlog file against a set of expected events. In this class we build a list of expected events (list of UpdateEngineEvent objects), and compare that against a "hostlog" returned from update_engine from the update. This hostlog is a json list of events fired during the update. @param source_release: The source build version. @param hostlog_filename: The path to a hotlog returned from nebraska. @param target_release: The target build version. """ if target_release is not None: expected_events = self._get_expected_event_for_post_reboot_check( source_release, target_release) else: expected_events = self._get_expected_events_for_rootfs_update( source_release) logging.info('Checking update against hostlog file: %s', hostlog_filename) try: with open(hostlog_filename, 'r') as fp: hostlog_events = json.load(fp) except Exception as e: raise error.TestFail('Error reading the hostlog file: %s' % e) for expected, actual in zip_longest(expected_events, hostlog_events): err_msg = self._verify_event_with_timeout(expected, actual) if err_msg is not None: raise error.TestFail(('Hostlog verification failed: %s ' % err_msg)) def get_payload_url_on_public_bucket(self, job_repo_url=None, full_payload=True, payload_type=_PAYLOAD_TYPE.CROS): """ Get the google storage url of the payload in a public bucket. We will be copying the payload to a public google storage bucket (similar location to updates via autest command). @param job_repo_url: string url containing the current build. @param full_payload: True for full, False for delta. @param payload_type: The type of payload to get. Can be a value of the _PAYLOAD_TYPE enum. """ if job_repo_url is not None: self._job_repo_url = job_repo_url _, build = tools.get_devserver_build_from_package_url( self._job_repo_url) self._build = build self._should_restore_stateful = True payload_url = self._get_payload_url(full_payload=full_payload, payload_type=payload_type) url = self._copy_payload_to_public_bucket(payload_url) logging.info('Public update URL: %s', url) return url def _get_stateful_url_on_public_bucket(self): """ Get the google storage URL of the payload stateful in a public bucket. We will be copying the payload to a public google storage bucket (similar location to updates via autest command). """ payload_url = self._get_payload_url() stateful_url = '/'.join( [payload_url.rpartition('/')[0], self._STATEFUL_ARCHIVE_NAME]) # We have a flat directory structure in the public directory. Therefore # we need to disambiguate the stateful archive by prepending full # payload name. stateful_filename = '_'.join([ os.path.splitext(payload_url.rpartition('/')[2])[0], self._STATEFUL_ARCHIVE_NAME ]) url = self._copy_payload_to_public_bucket( stateful_url, use_globbing=False, destination_filename=stateful_filename) logging.info('Public stateful URL: %s', url) return url def _get_provision_url_on_public_bucket(self, release_path): """ Copy the necessary artifacts for quick-provision to the public bucket and return the URL pointing to them. This is to enable local runs of tests that need to provision a source version on the DUT (such as m2n tests) without requiring lab cache server SSH access. @param release_path: path to the build artifacts in gs://chromeos-releases. Ex: dev-channel/asurada/14515.0.0. The output of _get_latest_serving_stable_build matches this format. """ # We have a flat directory structure in the public directory. Therefore # we need to disambiguate the path to the provision artifacts. new_gs_dir = os.path.join(self._CELLULAR_BUCKET, 'provision', release_path) src_gs_dir = os.path.join('gs://chromeos-releases', release_path) provision_artifacts = [ self._STATEFUL_ARCHIVE_NAME, self._ROOTFS_ARCHIVE_NAME, self._KERNEL_ARCHIVE_NAME ] for file in provision_artifacts: src_url = os.path.join(src_gs_dir, file) dst_url = os.path.join(new_gs_dir, file) utils.run([ 'gsutil', 'cp', '-n', '-a', 'public-read', src_url, dst_url ]) public_url = new_gs_dir.replace('gs://', 'https://storage.googleapis.com/') return public_url def _copy_quick_provision_to_dut(self): """ Copies the quick-provision script to the DUT from googlesource.""" tmp = autotemp.tempdir(unique_id='m2n') src = os.path.join(tmp.name, 'quick-provision') dst = '/usr/local/bin/quick-provision' logging.info('Downloading quick-provision from googlesource') qp_url_path = '%s/+/%s/%s?format=text' % ( 'chromiumos/platform/dev-util', 'refs/heads/main', 'quick-provision/quick-provision') contents_b64 = gob_util.FetchUrl(chromite_constants.EXTERNAL_GOB_HOST, qp_url_path) osutils.WriteFile(src, base64.b64decode(contents_b64).decode('utf-8')) self._host.send_file(src, dst) self._run(['chown', '$USER', dst]) self._run(['chmod', '755', dst]) def get_payload_for_nebraska(self, job_repo_url=None, build=None, full_payload=True, public_bucket=False, payload_type=_PAYLOAD_TYPE.CROS): """ Gets a platform or DLC payload URL to be used with a nebraska instance on the DUT. @param job_repo_url: string url containing the current build and cache server to use. @param build: string containing the build to use for the update, like R102-14644.0.0 (the milestone number is required). Only used for the public bucket update flow. @param full_payload: bool whether we want a full payload. @param public_bucket: True to return a payload on a public bucket. @param payload_type: The type of payload to get. Can be a value of the _PAYLOAD_TYPE enum. @returns string URL of a payload staged on a lab devserver. """ self._job_repo_url = self._get_job_repo_url(job_repo_url) if self._job_repo_url: logging.info('Getting payload for the build in job_repo_url') ds_url, build = tools.get_devserver_build_from_package_url( self._job_repo_url) self._autotest_devserver = dev_server.ImageServer(ds_url) self._build = build elif build is not None: logging.info('Getting payload for the build provided in test_that') # self._build looks like this: octopus-release/R102-14650.0.0 # Replace the version with the one provided in test_that. self._build = self._get_release_builder_path().rsplit( '/')[0] + '/' + build else: logging.info('Getting payload for the current build on the DUT') self._build = self._get_release_builder_path() logging.info("Getting payload for nebraska for build %s", self._build) if public_bucket: return self.get_payload_url_on_public_bucket( full_payload=full_payload, payload_type=payload_type) payload = self._get_payload_url(full_payload=full_payload, payload_type=payload_type) payload_url, _ = self._stage_payload_by_uri(payload) logging.info('Payload URL for Nebraska: %s', payload_url) self._should_restore_stateful = True return payload_url def update_device(self, payload_uri, clobber_stateful=False, tag='source', ignore_appid=False, m2n=False): """ Updates the device. Used by autoupdate_EndToEndTest and autoupdate_StatefulCompatibility, which use auto_updater to perform updates. @param payload_uri: The payload with which the device should be updated. @param clobber_stateful: Boolean that determines whether the stateful of the device should be force updated and the TPM ownership should be cleared. By default, set to False. @param tag: An identifier string added to each log filename. @param ignore_appid: True to tell Nebraska to ignore the App ID field when parsing the update request. This allows the target update to use a different board's image, which is needed for kernelnext updates. @param m2n: True for an m2n update. m2n update tests don't use signed payloads from gs://chromeos-releases/, so the payload paths need to be parsed differently. @raise error.TestFail if anything goes wrong with the update. """ cros_preserved_path = ('/mnt/stateful_partition/unencrypted/' 'preserve/cros-update') if m2n: # The payload_uri for an m2n update looks like: # http://100.115.220.112:8082/static/octopus-release/R102-14692.0.0/chromeos_R102-14692.0.0_octopus_full_dev.bin payload_path = payload_uri[payload_uri.index('static/'):] build_name = '/'.join(payload_path.split('/')[1:-1]) payload_filename = payload_path.split('/')[-1] else: # Otherwise the payload_uri looks like: # gs://chromeos-releases/dev-channel/octopus/14698.0.0/payloads/chromeos_14698.0.0_octopus_dev-channel_full_test.bin-gyzdkobygyzdck3swpkou632wan55vgx build_name, payload_filename = self._get_update_parameters_from_uri( payload_uri) logging.info('Installing %s on the DUT', payload_uri) with remote_access.ChromiumOSDeviceHandler( self._host.hostname, base_dir=cros_preserved_path) as device: updater = auto_updater.ChromiumOSUpdater( device, build_name, build_name, yes=True, payload_filename=payload_filename, clobber_stateful=clobber_stateful, clear_tpm_owner=clobber_stateful, do_stateful_update=True, staging_server=self._autotest_devserver.url(), transfer_class=auto_updater_transfer. LabEndToEndPayloadTransfer, ignore_appid=ignore_appid) try: updater.RunUpdate() except Exception as e: logging.exception('ERROR: Failed to update device.') raise error.TestFail(str(e)) finally: self._copy_generated_nebraska_logs( updater.request_logs_dir, identifier=tag) def _get_paygen_json(self): """Return the paygen.json file as a json dictionary.""" bucket, paygen_file = self._PAYGEN_JSON_URI.rsplit('/', 1) tmpdir = '/tmp/m2n/' self._host.run('mkdir -p %s' % tmpdir) gsutil_wrapper.copy_private_bucket(host=self._host, bucket=bucket, filename=paygen_file, destination=tmpdir) return json.loads( self._host.run('cat %s' % os.path.join(tmpdir, paygen_file), verbose=False).stdout) def _paygen_json_lookup(self, board, channel, delta_type): """ Filters the paygen.json file by board, channel, and payload type. @param board: The board name. @param channel: The ChromeOS channel. @param delta_type: OMAHA, FSI, MILESTONE. STEPPING_STONE. @returns json results filtered by the input params. """ paygen_data = self._get_paygen_json() result = [] if channel.endswith('-channel'): channel = channel[:-len('-channel')] for delta in paygen_data['delta']: if ((delta['board']['public_codename'] == board) and (delta.get('channel', None) == channel) and (delta.get('delta_type', None) == delta_type)): result.append(delta) return result def _get_latest_serving_stable_build(self): """ Returns the latest serving stable build on Omaha for the current board. It will lookup the paygen.json file and return the build label that can be passed to quick_provision. This is useful for M2N tests to easily find the first build to provision. @returns latest stable serving omaha build. """ if lsbrelease_utils.is_moblab(): raise error.TestNA("Moblab cannot run M2N tests. See b/193438616.") board = self._host.get_board().split(':')[1] # Boards like auron_paine are auron-paine in paygen.json and builders. if '_' in board: board = board.replace('_', '-') channel = 'stable-channel' delta_type = 'OMAHA' stable_paygen_data = self._paygen_json_lookup(board, channel, delta_type) if not stable_paygen_data: # Some unibuild boards can have ALL of their stable serving builds # also be an FSI. When this happens we will not find an OMAHA # payload to use because GE only publishes one for a channel+build # pair. So try to get the latest FSI on stable channel. logging.info('No OMAHA payloads found. Falling back to FSI') stable_paygen_data = self._paygen_json_lookup( board, channel, 'FSI') if not stable_paygen_data: raise error.TestFail( 'No stable build found in paygen.json for %s' % board) latest_stable_paygen_data = max( stable_paygen_data, key=(lambda key: key['chrome_os_version'])) return os.path.join(channel, board, latest_stable_paygen_data["chrome_os_version"])