# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import pm_errors import state_machine from autotest_lib.client.cros.cellular import mm1_constants class RegisterMachine(state_machine.StateMachine): """ RegisterMachine handles the state transitions involved in bringing the modem to the REGISTERED state. """ def __init__(self, modem, operator_code="", return_cb=None, raise_cb=None): super(RegisterMachine, self).__init__(modem) self._networks = None self._operator_code = operator_code self._return_cb = return_cb self._raise_cb = raise_cb def Cancel(self): """ Overriden from superclass. """ logging.info('RegisterMachine: Canceling register.') super(RegisterMachine, self).Cancel() state = self._modem.Get(mm1_constants.I_MODEM, 'State') reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED if state == mm1_constants.MM_MODEM_STATE_SEARCHING: logging.info('RegisterMachine: Setting state to ENABLED.') self._modem.ChangeState(mm1_constants.MM_MODEM_STATE_ENABLED, reason) self._modem.SetRegistrationState( mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE) self._modem.register_step = None if self._raise_cb: self._raise_cb( pm_errors.MMCoreError(pm_errors.MMCoreError.CANCELLED, 'Cancelled')) def _HandleEnabledState(self): logging.info('RegisterMachine: Modem is ENABLED.') logging.info('RegisterMachine: Setting registration state ' 'to SEARCHING.') self._modem.SetRegistrationState( mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_SEARCHING) logging.info('RegisterMachine: Setting state to SEARCHING.') reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED self._modem.ChangeState(mm1_constants.MM_MODEM_STATE_SEARCHING, reason) logging.info('RegisterMachine: Starting network scan.') try: self._networks = self._modem.SyncScan() except pm_errors.MMError as e: self._modem.register_step = None logging.error('An error occurred during network scan: ' + str(e)) self._modem.ChangeState( mm1_constants.MM_MODEM_STATE_ENABLED, mm1_constants.MODEM_STATE_CHANGE_REASON_UNKNOWN) self._modem.SetRegistrationState( mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE) if self._raise_cb: self._raise_cb(e) return False logging.info('RegisterMachine: Found networks: ' + str(self._networks)) return True def _HandleSearchingState(self): logging.info('RegisterMachine: Modem is SEARCHING.') if not self._networks: logging.info('RegisterMachine: Scan returned no networks.') logging.info('RegisterMachine: Setting state to ENABLED.') self._modem.ChangeState( mm1_constants.MM_MODEM_STATE_ENABLED, mm1_constants.MM_MODEM_STATE_CHANGE_REASON_UNKNOWN) # TODO(armansito): Figure out the correct registration # state to transition to when no network is present. logging.info('RegisterMachine: Setting registration state ' 'to IDLE.') self._modem.SetRegistrationState( mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE) self._modem.register_step = None if self._raise_cb: self._raise_cb(pm_errors.MMMobileEquipmentError( pm_errors.MMMobileEquipmentError.NO_NETWORK, 'No networks were found to register.')) return False # Pick the last network in the list. Roaming networks will come before # the home network which makes the last item in the list the home # network. if self._operator_code: if not self._operator_code in self._modem.scanned_networks: if self._raise_cb: self._raise_cb(pm_errors.MMCoreError( pm_errors.MMCoreError.FAILED, "Unknown network: " + self._operator_code)) return False network = self._modem.scanned_networks[self._operator_code] else: network = self._networks[-1] logging.info( 'RegisterMachine: Registering to network: ' + str(network)) self._modem.SetRegistered( network['operator-code'], network['operator-long']) # The previous call should have set the state to REGISTERED. self._modem.register_step = None if self._return_cb: self._return_cb() return False def _GetModemStateFunctionMap(self): return { mm1_constants.MM_MODEM_STATE_ENABLED: RegisterMachine._HandleEnabledState, mm1_constants.MM_MODEM_STATE_SEARCHING: RegisterMachine._HandleSearchingState } def _ShouldStartStateMachine(self): if self._modem.register_step and self._modem.register_step != self: # There is already an ongoing register operation. message = 'Register operation already in progress.' logging.info(message) error = pm_errors.MMCoreError(pm_errors.MMCoreError.IN_PROGRESS, message) if self._raise_cb: self._raise_cb(error) else: raise error elif self._modem.register_step is None: # There is no register operation going on, canceled or otherwise. state = self._modem.Get(mm1_constants.I_MODEM, 'State') if state != mm1_constants.MM_MODEM_STATE_ENABLED: message = 'Cannot initiate register while in state %d, ' \ 'state needs to be ENABLED.' % state error = pm_errors.MMCoreError(pm_errors.MMCoreError.WRONG_STATE, message) if self._raise_cb: self._raise_cb(error) else: raise error logging.info('Starting Register.') self._modem.register_step = self return True