#!/usr/bin/env python2 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import cmd import dbus import dbus.types import dbus.exceptions import pm_constants import common from autotest_lib.client.cros.cellular import mm1_constants class PseudoModemClient(cmd.Cmd): """ Interactive client for PseudoModemManager. """ def __init__(self): cmd.Cmd.__init__(self) self.prompt = '> ' self._bus = dbus.SystemBus() def _get_proxy(self, path=pm_constants.TESTING_PATH): return self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path) def _get_ism_proxy(self, state_machine): return self._get_proxy('/'.join([pm_constants.TESTING_PATH, state_machine])) def Begin(self): """ Starts the interactive shell. """ print '\nWelcome to the PseudoModemManager shell!\n' self.cmdloop() def can_exit(self): """Override""" return True def do_is_alive(self, args): """ Handles the 'is_alive' command. @params args: ignored. """ if args: print '\nCommand "is_alive" expects no arguments.\n' return print self._get_proxy().IsAlive(dbus_interface=pm_constants.I_TESTING) def help_is_alive(self): """ Handles the 'help is_alive' command. """ print '\nChecks that pseudomodem child process is alive.\n' def do_properties(self, args): """ Handles the 'properties' command. @param args: Arguments to the command. Unused. """ if args: print '\nCommand "properties" expects no arguments.\n' return try: props = self._get_proxy().GetAll( pm_constants.I_TESTING, dbus_interface=mm1_constants.I_PROPERTIES) print '\nProperties: ' for k, v in props.iteritems(): print ' ' + k + ': ' + str(v) print except dbus.exceptions.DBusException as e: print ('\nAn error occurred while communicating with ' 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + e.message + '\n') return False def help_properties(self): """Handles the 'help properties' command.""" print '\nReturns the properties under the testing interface.\n' def do_sms(self, args): """ Simulates a received SMS. @param args: A string containing the sender and the text message content, in which everything before the first ' ' character belongs to the sender and everything else belongs to the message content. For example "Gandalf You shall not pass!" will be parsed into: sender="Gandalf" content="You shall not pass!" Pseudomodem doesn't distinguish between phone numbers and strings containing non-numeric characters for the sender field so args can contain pretty much anything. """ arglist = args.split(' ', 1) if len(arglist) != 2: print '\nMalformed SMS args: ' + args + '\n' return try: self._get_proxy().ReceiveSms( arglist[0], arglist[1], dbus_interface=pm_constants.I_TESTING) print '\nSMS sent!\n' except dbus.exceptions.DBusException as e: print ('\nAn error occurred while communicating with ' 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + e.message + '\n') return False def help_sms(self): """Handles the 'help sms' command.""" print '\nUsage: sms \n' def do_set(self, args): """ Handles various commands that start with 'set'. @param args: Defines the set command to be issued and its arguments. Currently supported commands are: set pco """ arglist = args.split(' ') if len(arglist) < 1: print '\nInvalid command: set ' + args + '\n' return if arglist[0] == 'pco': if len(arglist) == 1: arglist.append('') elif len(arglist) != 2: print '\nExpected: pco . Found: ' + args + '\n' return pco_value = arglist[1] try: pco_list = [dbus.types.Struct( [dbus.types.UInt32(1), dbus.types.Boolean(True), dbus.types.ByteArray(pco_value)], signature='ubay')] self._get_proxy().UpdatePco( pco_list, dbus_interface=pm_constants.I_TESTING) print '\nPCO value updated!\n' except dbus.exceptions.DBusException as e: print ('\nAn error occurred while communicating with ' 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + e.message + '\n') else: print '\nUnknown command: set ' + args + '\n' return False def help_set(self): """Handles the 'help set' command.""" print ('\nUsage: set pco \n can be empty to set' ' the PCO value to an empty list.\n') def _get_state_machine(self, args): arglist = args.split() if len(arglist) != 1: print '\nExpected one argument: Name of state machine\n' return None try: return self._get_ism_proxy(arglist[0]) except dbus.exceptions.DBusException as e: print '\nNo such interactive state machine.\n' print 'Error obtained: |%s|\n' % repr(e) return None def do_is_waiting(self, machine): """ Determine if a machine is waiting for an advance call. @param machine: Case sensitive name of the machine. @return: True if |machine| is waiting to be advanced by the user. """ ism = self._get_state_machine(machine) if not ism: return False try: is_waiting = ism.IsWaiting( dbus_interface=pm_constants.I_TESTING_ISM) print ('\nState machine is %swaiting.\n' % ('' if is_waiting else 'not ')) except dbus.exceptions.DBusException as e: print ('\nCould not determine if |%s| is waiting: |%s|\n' % (machine, repr(e))) return False def help_is_waiting(self): """Handles the 'help is_waiting' command""" print ('\nUsage: is_waiting \n' 'Check whether a state machine is waiting for user action. The ' 'waiting machine can be advanced using the |advance| command.\n' 'state-machine-name is the case sensitive name of the machine' 'whose status is to be queried.\n') def do_advance(self, machine): """ Advance the given state machine. @param machine: Case sensitive name of the state machine to advance. @returns: True if |machine| was successfully advanced, False otherwise. """ ism = self._get_state_machine(machine) if not ism: return False try: success = ism.Advance(dbus_interface=pm_constants.I_TESTING_ISM) print ('\nAdvanced!\n' if success else '\nCould not advance.\n') except dbus.exceptions.DBusException as e: print '\nError while advancing state machine: |%s|\n' % repr(e) return False def help_advance(self): """Handles the 'help advance' command""" print ('\nUsage: advance \n' 'Advance a waiting state machine to the next step.\n' 'state-machine-name is the case sensitive name of the machine' 'to advance.\n') def do_exit(self, args): """ Handles the 'exit' command. @param args: Arguments to the command. Unused. """ if args: print '\nCommand "exit" expects no arguments.\n' return resp = raw_input('Are you sure? (yes/no): ') if resp == 'yes': print '\nGoodbye!\n' return True if resp != 'no': print '\nDid not understand: ' + resp + '\n' return False def help_exit(self): """Handles the 'help exit' command.""" print ('\nExits the interpreter. Shuts down the pseudo modem manager ' 'if the interpreter was launched by running pseudomodem.py') do_EOF = do_exit help_EOF = help_exit def main(): """ main method, run when this module is executed as stand-alone. """ client = PseudoModemClient() client.Begin() if __name__ == '__main__': main()