• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2
2
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import cmd
8import dbus
9import dbus.types
10import dbus.exceptions
11
12import pm_constants
13
14import common
15from autotest_lib.client.cros.cellular import mm1_constants
16
17class PseudoModemClient(cmd.Cmd):
18    """
19    Interactive client for PseudoModemManager.
20
21    """
22    def __init__(self):
23        cmd.Cmd.__init__(self)
24        self.prompt = '> '
25        self._bus = dbus.SystemBus()
26
27
28    def _get_proxy(self, path=pm_constants.TESTING_PATH):
29        return self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path)
30
31
32    def _get_ism_proxy(self, state_machine):
33        return self._get_proxy('/'.join([pm_constants.TESTING_PATH,
34                                         state_machine]))
35
36
37    def Begin(self):
38        """
39        Starts the interactive shell.
40
41        """
42        print '\nWelcome to the PseudoModemManager shell!\n'
43        self.cmdloop()
44
45
46    def can_exit(self):
47        """Override"""
48        return True
49
50
51    def do_is_alive(self, args):
52        """
53        Handles the 'is_alive' command.
54
55        @params args: ignored.
56
57        """
58        if args:
59            print '\nCommand "is_alive" expects no arguments.\n'
60            return
61        print self._get_proxy().IsAlive(dbus_interface=pm_constants.I_TESTING)
62
63
64    def help_is_alive(self):
65        """ Handles the 'help is_alive' command. """
66        print '\nChecks that pseudomodem child process is alive.\n'
67
68
69    def do_properties(self, args):
70        """
71        Handles the 'properties' command.
72
73        @param args: Arguments to the command. Unused.
74
75        """
76        if args:
77            print '\nCommand "properties" expects no arguments.\n'
78            return
79        try:
80            props = self._get_proxy().GetAll(
81                            pm_constants.I_TESTING,
82                            dbus_interface=mm1_constants.I_PROPERTIES)
83            print '\nProperties: '
84            for k, v in props.iteritems():
85                print '   ' + k + ': ' + str(v)
86            print
87        except dbus.exceptions.DBusException as e:
88            print ('\nAn error occurred while communicating with '
89                   'PseudoModemManager: ' + e.get_dbus_name() + ' - ' +
90                   e.message + '\n')
91        return False
92
93
94    def help_properties(self):
95        """Handles the 'help properties' command."""
96        print '\nReturns the properties under the testing interface.\n'
97
98
99    def do_sms(self, args):
100        """
101        Simulates a received SMS.
102
103        @param args: A string containing the sender and the text message
104                content, in which everything before the first ' ' character
105                belongs to the sender and everything else belongs to the
106                message content. For example "Gandalf You shall not pass!"
107                will be parsed into:
108
109                    sender="Gandalf"
110                    content="You shall not pass!"
111
112                Pseudomodem doesn't distinguish between phone numbers and
113                strings containing non-numeric characters for the sender field
114                so args can contain pretty much anything.
115
116        """
117        arglist = args.split(' ', 1)
118        if len(arglist) != 2:
119            print '\nMalformed SMS args: ' + args + '\n'
120            return
121        try:
122            self._get_proxy().ReceiveSms(
123                    arglist[0], arglist[1],
124                    dbus_interface=pm_constants.I_TESTING)
125            print '\nSMS sent!\n'
126        except dbus.exceptions.DBusException as e:
127            print ('\nAn error occurred while communicating with '
128                   'PseudoModemManager: ' + e.get_dbus_name() + ' - ' +
129                   e.message + '\n')
130        return False
131
132
133    def help_sms(self):
134        """Handles the 'help sms' command."""
135        print '\nUsage: sms <sender phone #> <message text>\n'
136
137
138    def do_set(self, args):
139        """
140        Handles various commands that start with 'set'.
141
142        @param args: Defines the set command to be issued and its
143                arguments. Currently supported commands are:
144
145                  set pco <pco-value>
146
147        """
148        arglist = args.split(' ')
149        if len(arglist) < 1:
150            print '\nInvalid command: set ' + args + '\n'
151            return
152        if arglist[0] == 'pco':
153            if len(arglist) == 1:
154                arglist.append('')
155            elif len(arglist) != 2:
156                print '\nExpected: pco <pco-value>. Found: ' + args + '\n'
157                return
158            pco_value = arglist[1]
159            try:
160                pco_list = [dbus.types.Struct(
161                    [dbus.types.UInt32(1),
162                        dbus.types.Boolean(True),
163                        dbus.types.ByteArray(pco_value)],
164                    signature='ubay')]
165                self._get_proxy().UpdatePco(
166                        pco_list, dbus_interface=pm_constants.I_TESTING)
167                print '\nPCO value updated!\n'
168            except dbus.exceptions.DBusException as e:
169                print ('\nAn error occurred while communicating with '
170                       'PseudoModemManager: ' + e.get_dbus_name() + ' - ' +
171                       e.message + '\n')
172        else:
173            print '\nUnknown command: set ' + args + '\n'
174        return False
175
176
177    def help_set(self):
178        """Handles the 'help set' command."""
179        print ('\nUsage: set pco <pco-value>\n<pco-value> can be empty to set'
180               ' the PCO value to an empty list.\n')
181
182
183    def _get_state_machine(self, args):
184        arglist = args.split()
185        if len(arglist) != 1:
186            print '\nExpected one argument: Name of state machine\n'
187            return None
188        try:
189            return self._get_ism_proxy(arglist[0])
190        except dbus.exceptions.DBusException as e:
191            print '\nNo such interactive state machine.\n'
192            print 'Error obtained: |%s|\n' % repr(e)
193            return None
194
195
196    def do_is_waiting(self, machine):
197        """
198        Determine if a machine is waiting for an advance call.
199
200        @param machine: Case sensitive name of the machine.
201        @return: True if |machine| is waiting to be advanced by the user.
202
203        """
204        ism = self._get_state_machine(machine)
205        if not ism:
206            return False
207
208        try:
209            is_waiting = ism.IsWaiting(
210                    dbus_interface=pm_constants.I_TESTING_ISM)
211            print ('\nState machine is %swaiting.\n' %
212                   ('' if is_waiting else 'not '))
213        except dbus.exceptions.DBusException as e:
214            print ('\nCould not determine if |%s| is waiting: |%s|\n' %
215                   (machine, repr(e)))
216        return False
217
218
219    def help_is_waiting(self):
220        """Handles the 'help is_waiting' command"""
221        print ('\nUsage: is_waiting <state-machine-name>\n'
222               'Check whether a state machine is waiting for user action. The '
223               'waiting machine can be advanced using the |advance| command.\n'
224               'state-machine-name is the case sensitive name of the machine'
225               'whose status is to be queried.\n')
226
227
228    def do_advance(self, machine):
229        """
230        Advance the given state machine.
231
232        @param machine: Case sensitive name of the state machine to advance.
233        @returns: True if |machine| was successfully advanced, False otherwise.
234
235        """
236        ism = self._get_state_machine(machine)
237        if not ism:
238            return False
239
240        try:
241            success = ism.Advance(dbus_interface=pm_constants.I_TESTING_ISM)
242            print ('\nAdvanced!\n' if success else '\nCould not advance.\n')
243        except dbus.exceptions.DBusException as e:
244            print '\nError while advancing state machine: |%s|\n' % repr(e)
245        return False
246
247
248    def help_advance(self):
249        """Handles the 'help advance' command"""
250        print ('\nUsage: advance <state-machine-name>\n'
251               'Advance a waiting state machine to the next step.\n'
252               'state-machine-name is the case sensitive name of the machine'
253               'to advance.\n')
254
255
256    def do_exit(self, args):
257        """
258        Handles the 'exit' command.
259
260        @param args: Arguments to the command. Unused.
261
262        """
263        if args:
264            print '\nCommand "exit" expects no arguments.\n'
265            return
266        resp = raw_input('Are you sure? (yes/no): ')
267        if resp == 'yes':
268            print '\nGoodbye!\n'
269            return True
270        if resp != 'no':
271            print '\nDid not understand: ' + resp + '\n'
272        return False
273
274
275    def help_exit(self):
276        """Handles the 'help exit' command."""
277        print ('\nExits the interpreter. Shuts down the pseudo modem manager '
278               'if the interpreter was launched by running pseudomodem.py')
279
280
281    do_EOF = do_exit
282    help_EOF = help_exit
283
284
285def main():
286    """ main method, run when this module is executed as stand-alone. """
287    client = PseudoModemClient()
288    client.Begin()
289
290
291if __name__ == '__main__':
292    main()
293