# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Utilities for cellular tests.""" import copy, dbus, os, tempfile # TODO(thieule): Consider renaming mm.py, mm1.py, modem.py, etc to be more # descriptive (crosbug.com/37060). import common from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import error from autotest_lib.client.cros.cellular import cellular from autotest_lib.client.cros.cellular import cellular_system_error from autotest_lib.client.cros.cellular import mm from autotest_lib.client.cros.cellular import modem from autotest_lib.client.cros import flimflam_test_path import flimflam TIMEOUT = 30 SERVICE_TIMEOUT = 60 import cellular_logging logger = cellular_logging.SetupCellularLogging('cell_tools') def ConnectToCellular(flim, timeout=TIMEOUT): """Attempts to connect to a cell network using FlimFlam. Args: flim: A flimflam object timeout: Timeout (in seconds) before giving up on connect Returns: a tuple of the service and the service state Raises: Error if connection fails or times out """ service = flim.FindCellularService(timeout=timeout) if not service: raise cellular_system_error.ConnectionFailure( 'Could not find cell service') properties = service.GetProperties(utf8_strings=True) logger.error('Properties are: %s', properties) logger.info('Connecting to cell service: %s', service) states = ['portal', 'online', 'idle'] state = flim.WaitForServiceState(service=service, expected_states=states, timeout=timeout, ignore_failure=True)[0] logger.debug('Cell connection state : %s ' % state) connected_states = ['portal', 'online'] if state in connected_states: logger.debug('Looks good, skip ConnectService') return service, state else: logger.debug('Trying to ConnectService') success, status = flim.ConnectService( service=service, assoc_timeout=timeout, config_timeout=timeout) if not success: logger.error('Connect failed: %s' % status) # TODO(rochberg): Turn off autoconnect if 'Error.AlreadyConnected' not in status['reason']: raise cellular_system_error.ConnectionFailure( 'Could not connect: %s.' % status) state = flim.WaitForServiceState(service=service, expected_states=connected_states, timeout=timeout, ignore_failure=True)[0] if not state in connected_states: raise cellular_system_error.BadState( 'Still in state %s, expecting one of: %s ' % (state, str(connected_states))) return service, state def FindLastGoodAPN(service, default=None): if not service: return default props = service.GetProperties() if 'Cellular.LastGoodAPN' not in props: return default last_good_apn = props['Cellular.LastGoodAPN'] return last_good_apn.get('apn', default) def DisconnectFromCellularService(bs, flim, service): """Attempts to disconnect from the supplied cellular service. Args: bs: A basestation object. Pass None to skip basestation-side checks flim: A flimflam object service: A cellular service object """ flim.DisconnectService(service) # Waits for flimflam state to go to idle if bs: verifier = bs.GetAirStateVerifier() # This is racy: The modem is free to report itself as # disconnected before it actually finishes tearing down its RF # connection. verifier.AssertDataStatusIn([ cellular.UeGenericDataStatus.DISCONNECTING, cellular.UeGenericDataStatus.REGISTERED, cellular.UeGenericDataStatus.NONE,]) def _ModemIsFullyDisconnected(): return verifier.IsDataStatusIn([ cellular.UeGenericDataStatus.REGISTERED, cellular.UeGenericDataStatus.NONE,]) utils.poll_for_condition( _ModemIsFullyDisconnected, timeout=20, exception=cellular_system_error.BadState( 'modem not disconnected from base station')) def _EnumerateModems(manager): """Get a set of modem paths.""" return set([x[1] for x in mm.EnumerateDevices(manager)]) def _SawNewModem(manager, preexisting_modems, old_modem): current_modems = _EnumerateModems(manager) if old_modem in current_modems: return False # NB: This fails if an unrelated modem disappears. Not fixing # until we support > 1 modem return preexisting_modems != current_modems def _WaitForModemToReturn(manager, preexisting_modems_original, modem_path): preexisting_modems = copy.copy(preexisting_modems_original) preexisting_modems.remove(modem_path) utils.poll_for_condition( lambda: _SawNewModem(manager, preexisting_modems, modem_path), timeout=50, exception=cellular_system_error.BadState( 'Modem did not come back after settings change')) current_modems = _EnumerateModems(manager) new_modems = [x for x in current_modems - preexisting_modems] if len(new_modems) != 1: raise cellular_system_error.BadState( 'Unexpected modem list change: %s vs %s' % (current_modems, new_modems)) logger.info('New modem: %s' % new_modems[0]) return new_modems[0] def SetFirmwareForTechnologyFamily(manager, modem_path, family): """Set the modem to firmware. Return potentially-new modem path.""" # todo(byronk): put this in a modem object? if family == cellular.TechnologyFamily.LTE: return # nothing to set up on a Pixel. todo(byronk) how about others? logger.debug('SetFirmwareForTechnologyFamily : manager : %s ' % manager) logger.debug('SetFirmwareForTechnologyFamily : modem_path : %s ' % modem_path) logger.debug('SetFirmwareForTechnologyFamily : family : %s ' % family) preexisting_modems = _EnumerateModems(manager) # We do not currently support any multi-family modems besides Gobi gobi = manager.GetModem(modem_path).GobiModem() if not gobi: raise cellular_system_error.BadScpiCommand( 'Modem %s does not support %s, cannot change technologies' % modem_path, family) logger.info('Changing firmware to technology family %s' % family) FamilyToCarrierString = { cellular.TechnologyFamily.UMTS: 'Generic UMTS', cellular.TechnologyFamily.CDMA: 'Verizon Wireless',} gobi.SetCarrier(FamilyToCarrierString[family]) return _WaitForModemToReturn(manager, preexisting_modems, modem_path) # A test PRL that has an ID of 3333 and sets the device to aquire the # default config of an 8960 with system_id 331. Base64 encoding # Generated with "base64 < prl" TEST_PRL_3333 = ( 'ADENBQMAAMAAAYADAgmABgIKDQsEAYAKDUBAAQKWAAICQGAJApYAAgIw8BAAAQDhWA=='. decode('base64_codec')) # A modem with this MDN will always report itself as activated TESTING_MDN = dbus.String('1115551212', variant_level=1) def _IsCdmaModemConfiguredCorrectly(manager, modem_path): """Returns true iff the CDMA modem at modem_path is configured correctly.""" # We don't test for systemID because the PRL should take care of # that. status = manager.GetModem(modem_path).SimpleModem().GetStatus() required_settings = {'mdn': TESTING_MDN, 'min': TESTING_MDN, 'prl_version': 3333} configured_correctly = True for rk, rv in required_settings.iteritems(): if rk not in status or rv != status[rk]: logger.error('_CheckCdmaModemStatus: %s: expected %s, got %s' % ( rk, rv, status.get(rk))) configured_correctly = False return configured_correctly def PrepareCdmaModem(manager, modem_path): """Configure a CDMA device (including PRL, MIN, and MDN).""" if _IsCdmaModemConfiguredCorrectly(manager, modem_path): return modem_path logger.info('Updating modem settings') preexisting_modems = _EnumerateModems(manager) cdma = manager.GetModem(modem_path).CdmaModem() with tempfile.NamedTemporaryFile() as f: os.chmod(f.name, 0744) f.write(TEST_PRL_3333) f.flush() logger.info('Calling ActivateManual to change PRL') cdma.ActivateManual({ 'mdn': TESTING_MDN, 'min': TESTING_MDN, 'prlfile': dbus.String(f.name, variant_level=1), 'system_id': dbus.UInt16(331, variant_level=1), # Default 8960 SID 'spc': dbus.String('000000'),}) new_path = _WaitForModemToReturn( manager, preexisting_modems, modem_path) if not _IsCdmaModemConfiguredCorrectly(manager, new_path): raise cellular_system_error.BadState('Modem configuration failed') return new_path def PrepareModemForTechnology(modem_path, target_technology): """Prepare modem for the technology: Sets things like firmware, PRL.""" manager, modem_path = mm.PickOneModem(modem_path) logger.info('Found modem %s' % modem_path) # todo(byronk) : This returns TechnologyFamily:UMTS on a Pixel. ???? current_family = manager.GetModem(modem_path).GetCurrentTechnologyFamily() target_family = cellular.TechnologyToFamily[target_technology] if current_family != target_family: logger.debug('Modem Current Family: %s ' % current_family) logger.debug('Modem Target Family : %s ' %target_family ) modem_path = SetFirmwareForTechnologyFamily( manager, modem_path, target_family) if target_family == cellular.TechnologyFamily.CDMA: modem_path = PrepareCdmaModem(manager, modem_path) # Force the modem to report that is has been activated since we # use a custom PRL and have already manually activated it. manager.GetModem(modem_path).GobiModem().ForceModemActivatedStatus() # When testing EVDO, we need to force the modem to register with EVDO # directly (bypassing CDMA 1x RTT) else the modem will not register # properly because it looks for CDMA 1x RTT first but can't find it # because the call box can only emulate one technology at a time (EVDO). try: if target_technology == cellular.Technology.EVDO_1X: network_preference = modem.Modem.NETWORK_PREFERENCE_EVDO_1X else: network_preference = modem.Modem.NETWORK_PREFERENCE_AUTOMATIC gobi = manager.GetModem(modem_path).GobiModem() gobi.SetNetworkPreference(network_preference) except AttributeError: # Not a Gobi modem pass return modem_path def FactoryResetModem(modem_pattern, spc='000000'): """Factory resets modem, returns DBus pathname of modem after reset.""" manager, modem_path = mm.PickOneModem(modem_pattern) preexisting_modems = _EnumerateModems(manager) modem = manager.GetModem(modem_path).Modem() modem.FactoryReset(spc) return _WaitForModemToReturn(manager, preexisting_modems, modem_path) class OtherDeviceShutdownContext(object): """Context manager that shuts down other devices. Usage: with cell_tools.OtherDeviceShutdownContext('cellular'): block TODO(rochberg): Replace flimflam.DeviceManager with this """ def __init__(self, device_type): self.device_type = device_type self.device_manager = None def __enter__(self): self.device_manager = flimflam.DeviceManager(flimflam.FlimFlam()) self.device_manager.ShutdownAllExcept(self.device_type) return self def __exit__(self, exception, value, traceback): if self.device_manager: self.device_manager.RestoreDevices() return False class AutoConnectContext(object): """Context manager which sets autoconnect to either true or false. Enable or Disable autoconnect for the cellular service. Restore it when done. Usage: with cell_tools.DisableAutoConnectContext(device, flim, autoconnect): block """ def __init__(self, device, flim, autoconnect): self.device = device self.flim = flim self.autoconnect = autoconnect self.autoconnect_changed = False def PowerOnDevice(self, device): """Power on a flimflam device, ignoring in progress errors.""" logger.info('powered = %s' % device.GetProperties()['Powered']) if device.GetProperties()['Powered']: return try: device.Enable() except dbus.exceptions.DBusException, e: if e._dbus_error_name != 'org.chromium.flimflam.Error.InProgress': raise e def __enter__(self): """Power up device, get the service and disable autoconnect.""" changed = False self.PowerOnDevice(self.device) # Use SERVICE_TIMEOUT*2 here because it may take SERVICE_TIMEOUT # seconds for the modem to disconnect when the base emulator is taken # offline for reconfiguration and then another SERVICE_TIMEOUT # seconds for the modem to reconnect after the base emulator is # brought back online. # # TODO(jglasgow): generalize to use services associated with device service = self.flim.FindCellularService(timeout=SERVICE_TIMEOUT*2) if not service: raise error.TestFail('No cellular service available.') # Always set the AutoConnect property even if the requested value # is the same so that shill will retain the AutoConnect property, else # shill may override it. props = service.GetProperties() autoconnect = props['AutoConnect'] logger.info('AutoConnect = %s' % autoconnect) logger.info('Setting AutoConnect = %s.', self.autoconnect) service.SetProperty('AutoConnect', dbus.Boolean(self.autoconnect)) if autoconnect != self.autoconnect: props = service.GetProperties() autoconnect = props['AutoConnect'] changed = True # Make sure the cellular service gets persisted by taking it out of # the ephemeral profile. if not props['Profile']: manager_props = self.flim.manager.GetProperties() active_profile = manager_props['ActiveProfile'] logger.info("Setting cellular service profile to %s", active_profile) service.SetProperty('Profile', active_profile) if autoconnect != self.autoconnect: raise error.TestFail('AutoConnect is %s, but we want it to be %s' % (autoconnect, self.autoconnect)) self.autoconnect_changed = changed return self def __exit__(self, exception, value, traceback): """Restore autoconnect state if we changed it.""" if not self.autoconnect_changed: return False try: self.PowerOnDevice(self.device) except Exception as e: if exception: logger.error( 'Exiting AutoConnectContext with one exception, but ' + 'PowerOnDevice raised another') logger.error( 'Swallowing PowerOnDevice exception %s' % e) return False else: raise e # TODO(jglasgow): generalize to use services associated with # device, and restore state only on changed services service = self.flim.FindCellularService() if not service: logger.error('Cannot find cellular service. ' 'Autoconnect state not restored.') return False service.SetProperty('AutoConnect', dbus.Boolean(not self.autoconnect)) return False