# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import pm_errors import state_machine from autotest_lib.client.cros.cellular import mm1_constants class DisconnectMachine(state_machine.StateMachine): """ DisconnectMachine handles the state transitions involved in bringing the modem to the DISCONNECTED state. """ def __init__(self, modem, bearer_path, return_cb, raise_cb, return_cb_args=[]): super(DisconnectMachine, self).__init__(modem) self.bearer_path = bearer_path self.return_cb = return_cb self.raise_cb = raise_cb self.return_cb_args = return_cb_args def _HandleConnectedState(self): logging.info('DisconnectMachine: Modem state is CONNECTED.') logging.info('DisconnectMachine: Setting state to DISCONNECTING.') reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED self._modem.ChangeState(mm1_constants.MM_MODEM_STATE_DISCONNECTING, reason) return True def _HandleDisconnectingState(self): logging.info('DisconnectMachine: Modem state is DISCONNECTING.') assert not self._modem.IsPendingConnect() assert not self._modem.IsPendingEnable() assert not self._modem.IsPendingRegister() dc_reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED try: if self.bearer_path == mm1_constants.ROOT_PATH: for bearer in self._modem.active_bearers.keys(): self._modem.DeactivateBearer(bearer) else: self._modem.DeactivateBearer(self.bearer_path) except pm_errors.MMError as e: logging.error('DisconnectMachine: Failed to disconnect: ' + str(e)) dc_reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_UNKNOWN self.raise_cb(e) finally: # TODO(armansito): What should happen in a disconnect # failure? Should we stay connected or become REGISTERED? logging.info('DisconnectMachine: Setting state to REGISTERED.') self._modem.ChangeState(mm1_constants.MM_MODEM_STATE_REGISTERED, dc_reason) self._modem.disconnect_step = None logging.info('DisconnectMachine: Calling return callback.') self.return_cb(*self.return_cb_args) return False def _GetModemStateFunctionMap(self): return { mm1_constants.MM_MODEM_STATE_CONNECTED: DisconnectMachine._HandleConnectedState, mm1_constants.MM_MODEM_STATE_DISCONNECTING: DisconnectMachine._HandleDisconnectingState } def _ShouldStartStateMachine(self): if (self._modem.disconnect_step and # There is already a disconnect operation in progress. self._modem.disconnect_step != self): message = 'There is already an ongoing disconnect operation.' logging.error(message) self.raise_cb( pm_errors.MMCoreError(pm_errors.MMCoreError.IN_PROGRESS, message)) return False elif self._modem.disconnect_step is None: # There is no disconnect operation going on, canceled or otherwise. state = self._modem.Get(mm1_constants.I_MODEM, 'State') if state != mm1_constants.MM_MODEM_STATE_CONNECTED: message = 'Modem cannot be disconnected when not connected.' logging.error(message) self.raise_cb( pm_errors.MMCoreError(pm_errors.MMCoreError.WRONG_STATE, message)) return False if self.bearer_path == mm1_constants.ROOT_PATH: logging.info('All bearers will be disconnected.') elif not (self.bearer_path in self._modem.bearers): message = ('Bearer with path "%s" not found' % self.bearer_path) logging.error(message) self.raise_cb( pm_errors.MMCoreError(pm_errors.MMCoreError.NOT_FOUND, message)) return False elif not (self.bearer_path in self._modem.active_bearers): message = ('No active bearer with path ' + self.bearer_path + ' found, current active bearers are ' + str(self._modem.active_bearers)) logging.error(message) self.raise_cb(pm_errors.MMCoreError( pm_errors.MMCoreError.NOT_FOUND, message)) return False assert not self._modem.IsPendingConnect() assert not self._modem.IsPendingEnable() assert not self._modem.IsPendingRegister() logging.info('Starting Disconnect.') self._modem.disconnect_step = self return True