#!/usr/bin/env python # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. # This module is the entry point for pseudomodem. Though honestly, I can't think # of any case when you want to use this module directly. Instead, use the # |pseudomodem_context| module that provides a way to launch pseudomodem in a # child process. import argparse import dbus import dbus.mainloop.glib import gobject import imp import json import logging import os import os.path import signal import sys import testing import traceback import logging_setup import modem_cdma import modem_3gpp import modemmanager import sim import state_machine_factory as smf import common from autotest_lib.client.cros.cellular import mm1_constants # Flags used by pseudomodem modules only that are defined below in # ParserArguments. CLI_FLAG = '--cli' EXIT_ERROR_FILE_FLAG = '--exit-error-file' class PseudoModemManager(object): """ The main class to be used to launch the pseudomodem. There should be only one instance of this class that orchestrates pseudomodem. """ def Setup(self, opts): """ Call |Setup| to prepare pseudomodem to be launched. @param opts: The options accepted by pseudomodem. See top level function |ParseArguments| for details. """ self._opts = opts self._in_exit_sequence = False self._manager = None self._modem = None self._state_machine_factory = None self._sim = None self._mainloop = None self._dbus_loop = dbus.mainloop.glib.DBusGMainLoop() self._bus = dbus.SystemBus(private=True, mainloop=self._dbus_loop) self._bus_name = dbus.service.BusName(mm1_constants.I_MODEM_MANAGER, self._bus) logging.info('Exported dbus service with well known name: |%s|', self._bus_name.get_name()) self._SetupPseudomodemParts() logging.info('Pseudomodem setup completed!') def StartBlocking(self): """ Start pseudomodem operation. This call blocks untill |GracefulExit| is called from some other context. """ self._mainloop = gobject.MainLoop() self._mainloop.run() def GracefulExit(self): """ Stop pseudomodem operation and clean up. """ if self._in_exit_sequence: logging.debug('Already exiting.') return self._in_exit_sequence = True logging.info('pseudomodem shutdown sequence initiated...') # Guard each step by its own try...catch, because we want to attempt # each step irrespective of whether the earlier ones succeeded. try: if self._manager: self._manager.Remove(self._modem) except Exception as e: logging.warning('Error while exiting: %s', repr(e)) try: if self._mainloop: self._mainloop.quit() except Exception as e: logging.warning('Error while exiting: %s', repr(e)) logging.info('pseudomodem: Bye! Bye!') def _SetupPseudomodemParts(self): """ Contructs all pseudomodem objects, but does not start operation. Three main objects are created: the |Modem|, the |Sim|, and the |StateMachineFactory|. This objects may be instantiations of the default classes, or of user provided classes, depending on options provided. """ self._ReadCustomParts() use_3gpp = (self._opts.family == '3GPP') if not self._modem and not self._state_machine_factory: self._state_machine_factory = smf.StateMachineFactory() logging.info('Created default state machine factory.') if use_3gpp and not self._sim: self._sim = sim.SIM(sim.SIM.Carrier('test'), mm1_constants.MM_MODEM_ACCESS_TECHNOLOGY_GSM, locked=self._opts.locked) logging.info('Created default 3GPP SIM.') # Store this constant here because the variable name is too long. network_available = dbus.types.UInt32( mm1_constants.MM_MODEM_3GPP_NETWORK_AVAILABILITY_AVAILABLE) if not self._modem: if use_3gpp: technology_gsm = dbus.types.UInt32( mm1_constants.MM_MODEM_ACCESS_TECHNOLOGY_GSM) networks = [modem_3gpp.Modem3gpp.GsmNetwork( 'Roaming Network Long ' + str(i), 'Roaming Network Short ' + str(i), '00100' + str(i + 1), network_available, technology_gsm) for i in xrange(self._opts.roaming_networks)] # TODO(armansito): Support "not activated" initialization option # for 3GPP carriers. self._modem = modem_3gpp.Modem3gpp( self._state_machine_factory, roaming_networks=networks) logging.info('Created default 3GPP modem.') else: self._modem = modem_cdma.ModemCdma( self._state_machine_factory, modem_cdma.ModemCdma.CdmaNetwork( activated=self._opts.activated)) logging.info('Created default CDMA modem.') # Everyone gets the |_bus|, woohoo! self._manager = modemmanager.ModemManager(self._bus) self._modem.SetBus(self._bus) # Also sets it on StateMachineFactory. self._manager.Add(self._modem) # Unfortunately, setting the SIM has to be deferred until everyone has # their BUS set. |self._sim| exists if the user provided one, or if the # modem family is |3GPP|. if self._sim: self._modem.SetSIM(self._sim) # The testing interface can be brought up now that we have the bus. self._testing_object = testing.Testing(self._modem, self._bus) def _ReadCustomParts(self): """ Loads user provided implementations of pseudomodem objects. The user can provide their own implementations of the |Modem|, |Sim| or |StateMachineFactory| classes. """ if not self._opts.test_module: return test_module = self._LoadCustomPartsModule(self._opts.test_module) if self._opts.test_modem_class: self._modem = self._CreateCustomObject(test_module, self._opts.test_modem_class, self._opts.test_modem_arg) if self._opts.test_sim_class: self._sim = self._CreateCustomObject(test_module, self._opts.test_sim_class, self._opts.test_sim_arg) if self._opts.test_state_machine_factory_class: if self._opts.test_modem_class: logging.warning( 'User provided a |Modem| implementation as well as a ' '|StateMachineFactory|. Ignoring the latter.') else: self._state_machine_factory = self._CreateCustomObject( test_module, self._opts.test_state_machine_factory_class, self._opts.test_state_machine_factory_arg) def _CreateCustomObject(self, test_module, class_name, arg_file_name): """ Create the custom object specified by test. @param test_module: The loaded module that implemets the custom object. @param class_name: Name of the class implementing the custom object. @param arg_file_name: Absolute path to file containing list of arguments taken by |test_module|.|class_name| constructor in json. @returns: A brand new object of the custom type. @raises: AttributeError if the class definition is not found; ValueError if |arg_file| does not contain valid json representaiton of a python list. Other errors may be raised during object creation. """ arg = None if arg_file_name: arg_file = open(arg_file_name, 'rb') try: arg = json.load(arg_file) finally: arg_file.close() if not isinstance(arg, list): raise ValueError('Argument must be a python list.') class_def = getattr(test_module, class_name) try: if arg: logging.debug('Loading test class %s%s', class_name, str(arg)) return class_def(*arg) else: logging.debug('Loading test class %s', class_def) return class_def() except Exception as e: logging.error('Exception raised when instantiating class %s: %s', class_name, str(e)) raise def _LoadCustomPartsModule(self, module_abs_path): """ Loads the given file as a python module. The loaded module *is* added to |sys.modules|. @param module_abs_path: Absolute path to the file to be loaded. @returns: The loaded module. @raises: ImportError if the module can not be loaded, or if another module with the same name is already loaded. """ path, name = os.path.split(module_abs_path) name, _ = os.path.splitext(name) if name in sys.modules: raise ImportError('A module named |%s| is already loaded.' % name) logging.debug('Loading module %s from %s', name, path) module_file, filepath, data = imp.find_module(name, [path]) try: module = imp.load_module(name, module_file, filepath, data) except Exception as e: logging.error( 'Exception raised when loading test module from %s: %s', module_abs_path, str(e)) raise finally: module_file.close() return module # ############################################################################## # Public static functions. def ParseArguments(arg_string=None): """ The main argument parser. Pseudomodem is a command line tool. Since pseudomodem is a highly customizable tool, the command line arguments are expected to be quite complex. We use argparse to keep the command line options easy to use. @param arg_string: If not None, the string to parse. If none, |sys.argv| is used to obtain the argument string. @returns: The parsed options object. """ parser = argparse.ArgumentParser( description="Run pseudomodem to simulate a modem using the " "modemmanager-next DBus interface.") parser.add_argument( CLI_FLAG, action='store_true', default=False, help='Launch the command line interface in foreground to interact ' 'with the launched pseudomodem process. This argument is used ' 'by |pseudomodem_context|. pseudomodem itself ignores it.') parser.add_argument( EXIT_ERROR_FILE_FLAG, default=None, help='If provided, full path to file to which pseudomodem should ' 'dump the error condition before exiting, in case of a crash. ' 'The file is not created if it does not already exist.') modem_arguments = parser.add_argument_group( title='Modem options', description='Options to customize the modem exported.') modem_arguments.add_argument( '--family', '-f', choices=['3GPP', 'CDMA'], default='3GPP') gsm_arguments = parser.add_argument_group( title='3GPP options', description='Options specific to 3GPP modems. [Only make sense ' 'when modem family is 3GPP]') gsm_arguments.add_argument( '--roaming-networks', '-r', type=_NonNegInt, default=0, metavar='<# networks>', help='Number of roaming networks available') cdma_arguments = parser.add_argument_group( title='CDMA options', description='Options specific to CDMA modems. [Only make sense ' 'when modem family is CDMA]') sim_arguments = parser.add_argument_group( title='SIM options', description='Options to customize the SIM in the modem. [Only make ' 'sense when modem family is 3GPP]') sim_arguments.add_argument( '--activated', type=bool, default=True, help='Determine whether the SIM is activated') sim_arguments.add_argument( '--locked', '-l', type=bool, default=False, help='Determine whether the SIM is in locked state') testing_arguments = parser.add_argument_group( title='Testing interface options', description='Options to modify how the tests or user interacts ' 'with pseudomodem') testing_arguments = parser.add_argument( '--interactive-state-machines-all', type=bool, default=False, help='Launch all state machines in interactive mode.') testing_arguments = parser.add_argument( '--interactive-state-machine', type=str, default=None, help='Launch the specified state machine in interactive mode. May ' 'be repeated to specify multiple machines.') customize_arguments = parser.add_argument_group( title='Customizable modem options', description='Options to customize the emulated modem.') customize_arguments.add_argument( '--test-module', type=str, default=None, metavar='CUSTOM_MODULE', help='Absolute path to the module with custom definitions.') customize_arguments.add_argument( '--test-modem-class', type=str, default=None, metavar='MODEM_CLASS', help='Name of the class in CUSTOM_MODULE that implements the modem ' 'to load.') customize_arguments.add_argument( '--test-modem-arg', type=str, default=None, help='Absolute path to the json description of argument list ' 'taken by MODEM_CLASS.') customize_arguments.add_argument( '--test-sim-class', type=str, default=None, metavar='SIM_CLASS', help='Name of the class in CUSTOM_MODULE that implements the SIM ' 'to load.') customize_arguments.add_argument( '--test-sim-arg', type=str, default=None, help='Aboslute path to the json description of argument list ' 'taken by SIM_CLASS') customize_arguments.add_argument( '--test-state-machine-factory-class', type=str, default=None, metavar='SMF_CLASS', help='Name of the class in CUSTOM_MODULE that impelements the ' 'state machine factory to load. Only used if MODEM_CLASS is ' 'not provided.') customize_arguments.add_argument( '--test-state-machine-factory-arg', type=str, default=None, help='Absolute path to the json description of argument list ' 'taken by SMF_CLASS') opts = parser.parse_args(arg_string) # Extra sanity checks. if opts.family == 'CDMA' and opts.roaming_networks > 0: raise argparse.ArgumentTypeError('CDMA networks do not support ' 'roaming networks.') test_objects = (opts.test_modem_class or opts.test_sim_class or opts.test_state_machine_factory_class) if not opts.test_module and test_objects: raise argparse.ArgumentTypeError('test_module is required with any ' 'other customization arguments.') if opts.test_modem_class and opts.test_state_machine_factory_class: logging.warning('test-state-machine-factory-class will be ignored ' 'because test-modem-class was provided.') return opts def ExtractExitError(dump_file_path): """ Gets the exit error left behind by a crashed pseudomodem. If there is a file at |dump_file_path|, extracts the error and the traceback left behind by the child process. This function is intended to be used by the launching process to parse the error file left behind by pseudomodem. @param dump_file_path: Full path to the file to read. @returns: (error_reason, error_traceback) error_reason: str. The one line reason for error that should be used to raise exceptions. error_traceback: A list of str. This is the traceback left behind by the child process, if any. May be []. """ error_reason = 'No detailed reason found :(' error_traceback = [] if dump_file_path: try: dump_file = open(dump_file_path, 'rb') error_reason = dump_file.readline().strip() error_traceback = dump_file.readlines() dump_file.close() except OSError as e: logging.error('Could not open dump file %s: %s', dump_file_path, str(e)) return error_reason, error_traceback # The single global instance of PseudoModemManager. _pseudo_modem_manager = None # ############################################################################## # Private static functions. def _NonNegInt(value): value = int(value) if value < 0: raise argparse.ArgumentTypeError('%s is not a non-negative int' % value) return value def _DumpExitError(dump_file_path, exc): """ Dump information about the raised exception in the exit error file. Format of file dumped: - First line is the reason for the crash. - Subsequent lines are the traceback from the exception raised. We expect the file to exist, because we want the launching context (that will eventually read the error dump) to create and own the file. @param dump_file_path: Full path to file to which we should dump. @param exc: The exception raised. """ if not dump_file_path: return if not os.path.isfile(dump_file_path): logging.error('File |%s| does not exist. Can not dump exit error.', dump_file_path) return try: dump_file = open(dump_file_path, 'wb') except IOError as e: logging.error('Could not open file |%s| to dump exit error. ' 'Exception raised when opening file: %s', dump_file_path, str(e)) return dump_file.write(str(exc) + '\n') dump_file.writelines(traceback.format_exc()) dump_file.close() def sig_handler(signum, frame): """ Top level signal handler to handle user interrupt. @param signum: The signal received. @param frame: Ignored. """ global _pseudo_modem_manager logging.debug('Signal handler called with signal %d', signum) if _pseudo_modem_manager: _pseudo_modem_manager.GracefulExit() def main(): """ This is the entry point for raw pseudomodem. You should not be running this module as a script. If you're trying to run pseudomodem from the command line, see |pseudomodem_context| module. """ global _pseudo_modem_manager logging_setup.SetupLogging() logging.info('Pseudomodem commandline: [%s]', str(sys.argv)) opts = ParseArguments() signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) try: _pseudo_modem_manager = PseudoModemManager() _pseudo_modem_manager.Setup(opts) _pseudo_modem_manager.StartBlocking() except Exception as e: logging.error('Caught exception at top level: %s', str(e)) _DumpExitError(opts.exit_error_file, e) _pseudo_modem_manager.GracefulExit() raise if __name__ == '__main__': main()