1#!/usr/bin/env python2 2 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import cmd 8import dbus 9import dbus.types 10import dbus.exceptions 11 12import pm_constants 13 14import common 15from autotest_lib.client.cros.cellular import mm1_constants 16 17class PseudoModemClient(cmd.Cmd): 18 """ 19 Interactive client for PseudoModemManager. 20 21 """ 22 def __init__(self): 23 cmd.Cmd.__init__(self) 24 self.prompt = '> ' 25 self._bus = dbus.SystemBus() 26 27 28 def _get_proxy(self, path=pm_constants.TESTING_PATH): 29 return self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path) 30 31 32 def _get_ism_proxy(self, state_machine): 33 return self._get_proxy('/'.join([pm_constants.TESTING_PATH, 34 state_machine])) 35 36 37 def Begin(self): 38 """ 39 Starts the interactive shell. 40 41 """ 42 print '\nWelcome to the PseudoModemManager shell!\n' 43 self.cmdloop() 44 45 46 def can_exit(self): 47 """Override""" 48 return True 49 50 51 def do_is_alive(self, args): 52 """ 53 Handles the 'is_alive' command. 54 55 @params args: ignored. 56 57 """ 58 if args: 59 print '\nCommand "is_alive" expects no arguments.\n' 60 return 61 print self._get_proxy().IsAlive(dbus_interface=pm_constants.I_TESTING) 62 63 64 def help_is_alive(self): 65 """ Handles the 'help is_alive' command. """ 66 print '\nChecks that pseudomodem child process is alive.\n' 67 68 69 def do_properties(self, args): 70 """ 71 Handles the 'properties' command. 72 73 @param args: Arguments to the command. Unused. 74 75 """ 76 if args: 77 print '\nCommand "properties" expects no arguments.\n' 78 return 79 try: 80 props = self._get_proxy().GetAll( 81 pm_constants.I_TESTING, 82 dbus_interface=mm1_constants.I_PROPERTIES) 83 print '\nProperties: ' 84 for k, v in props.iteritems(): 85 print ' ' + k + ': ' + str(v) 86 print 87 except dbus.exceptions.DBusException as e: 88 print ('\nAn error occurred while communicating with ' 89 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + 90 e.message + '\n') 91 return False 92 93 94 def help_properties(self): 95 """Handles the 'help properties' command.""" 96 print '\nReturns the properties under the testing interface.\n' 97 98 99 def do_sms(self, args): 100 """ 101 Simulates a received SMS. 102 103 @param args: A string containing the sender and the text message 104 content, in which everything before the first ' ' character 105 belongs to the sender and everything else belongs to the 106 message content. For example "Gandalf You shall not pass!" 107 will be parsed into: 108 109 sender="Gandalf" 110 content="You shall not pass!" 111 112 Pseudomodem doesn't distinguish between phone numbers and 113 strings containing non-numeric characters for the sender field 114 so args can contain pretty much anything. 115 116 """ 117 arglist = args.split(' ', 1) 118 if len(arglist) != 2: 119 print '\nMalformed SMS args: ' + args + '\n' 120 return 121 try: 122 self._get_proxy().ReceiveSms( 123 arglist[0], arglist[1], 124 dbus_interface=pm_constants.I_TESTING) 125 print '\nSMS sent!\n' 126 except dbus.exceptions.DBusException as e: 127 print ('\nAn error occurred while communicating with ' 128 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + 129 e.message + '\n') 130 return False 131 132 133 def help_sms(self): 134 """Handles the 'help sms' command.""" 135 print '\nUsage: sms <sender phone #> <message text>\n' 136 137 138 def do_set(self, args): 139 """ 140 Handles various commands that start with 'set'. 141 142 @param args: Defines the set command to be issued and its 143 arguments. Currently supported commands are: 144 145 set pco <pco-value> 146 147 """ 148 arglist = args.split(' ') 149 if len(arglist) < 1: 150 print '\nInvalid command: set ' + args + '\n' 151 return 152 if arglist[0] == 'pco': 153 if len(arglist) == 1: 154 arglist.append('') 155 elif len(arglist) != 2: 156 print '\nExpected: pco <pco-value>. Found: ' + args + '\n' 157 return 158 pco_value = arglist[1] 159 try: 160 pco_list = [dbus.types.Struct( 161 [dbus.types.UInt32(1), 162 dbus.types.Boolean(True), 163 dbus.types.ByteArray(pco_value)], 164 signature='ubay')] 165 self._get_proxy().UpdatePco( 166 pco_list, dbus_interface=pm_constants.I_TESTING) 167 print '\nPCO value updated!\n' 168 except dbus.exceptions.DBusException as e: 169 print ('\nAn error occurred while communicating with ' 170 'PseudoModemManager: ' + e.get_dbus_name() + ' - ' + 171 e.message + '\n') 172 else: 173 print '\nUnknown command: set ' + args + '\n' 174 return False 175 176 177 def help_set(self): 178 """Handles the 'help set' command.""" 179 print ('\nUsage: set pco <pco-value>\n<pco-value> can be empty to set' 180 ' the PCO value to an empty list.\n') 181 182 183 def _get_state_machine(self, args): 184 arglist = args.split() 185 if len(arglist) != 1: 186 print '\nExpected one argument: Name of state machine\n' 187 return None 188 try: 189 return self._get_ism_proxy(arglist[0]) 190 except dbus.exceptions.DBusException as e: 191 print '\nNo such interactive state machine.\n' 192 print 'Error obtained: |%s|\n' % repr(e) 193 return None 194 195 196 def do_is_waiting(self, machine): 197 """ 198 Determine if a machine is waiting for an advance call. 199 200 @param machine: Case sensitive name of the machine. 201 @return: True if |machine| is waiting to be advanced by the user. 202 203 """ 204 ism = self._get_state_machine(machine) 205 if not ism: 206 return False 207 208 try: 209 is_waiting = ism.IsWaiting( 210 dbus_interface=pm_constants.I_TESTING_ISM) 211 print ('\nState machine is %swaiting.\n' % 212 ('' if is_waiting else 'not ')) 213 except dbus.exceptions.DBusException as e: 214 print ('\nCould not determine if |%s| is waiting: |%s|\n' % 215 (machine, repr(e))) 216 return False 217 218 219 def help_is_waiting(self): 220 """Handles the 'help is_waiting' command""" 221 print ('\nUsage: is_waiting <state-machine-name>\n' 222 'Check whether a state machine is waiting for user action. The ' 223 'waiting machine can be advanced using the |advance| command.\n' 224 'state-machine-name is the case sensitive name of the machine' 225 'whose status is to be queried.\n') 226 227 228 def do_advance(self, machine): 229 """ 230 Advance the given state machine. 231 232 @param machine: Case sensitive name of the state machine to advance. 233 @returns: True if |machine| was successfully advanced, False otherwise. 234 235 """ 236 ism = self._get_state_machine(machine) 237 if not ism: 238 return False 239 240 try: 241 success = ism.Advance(dbus_interface=pm_constants.I_TESTING_ISM) 242 print ('\nAdvanced!\n' if success else '\nCould not advance.\n') 243 except dbus.exceptions.DBusException as e: 244 print '\nError while advancing state machine: |%s|\n' % repr(e) 245 return False 246 247 248 def help_advance(self): 249 """Handles the 'help advance' command""" 250 print ('\nUsage: advance <state-machine-name>\n' 251 'Advance a waiting state machine to the next step.\n' 252 'state-machine-name is the case sensitive name of the machine' 253 'to advance.\n') 254 255 256 def do_exit(self, args): 257 """ 258 Handles the 'exit' command. 259 260 @param args: Arguments to the command. Unused. 261 262 """ 263 if args: 264 print '\nCommand "exit" expects no arguments.\n' 265 return 266 resp = raw_input('Are you sure? (yes/no): ') 267 if resp == 'yes': 268 print '\nGoodbye!\n' 269 return True 270 if resp != 'no': 271 print '\nDid not understand: ' + resp + '\n' 272 return False 273 274 275 def help_exit(self): 276 """Handles the 'help exit' command.""" 277 print ('\nExits the interpreter. Shuts down the pseudo modem manager ' 278 'if the interpreter was launched by running pseudomodem.py') 279 280 281 do_EOF = do_exit 282 help_EOF = help_exit 283 284 285def main(): 286 """ main method, run when this module is executed as stand-alone. """ 287 client = PseudoModemClient() 288 client.Begin() 289 290 291if __name__ == '__main__': 292 main() 293