• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import at_transceiver
8
9import logging
10import mox
11import os
12import unittest
13
14import at_channel
15import modem_configuration
16import task_loop
17import wardmodem_exceptions as wme
18
19class ATTransceiverTestCase(unittest.TestCase):
20    """
21    Base test fixture for ATTransceiver class.
22
23    """
24    class TestMachine(object):
25        """ Stub test machine used by tests below. """
26        def test_function(self, _):
27            """
28            A stub StateMachine API function.
29
30            wardmodem calls will be placed to this function.
31
32            @param _: Ignored.
33
34            """
35            pass
36
37
38        # Needed in a test machine.
39        def get_well_known_name(self):
40            """ Get the well known name of this machine as str. """
41            return "test_machine"
42
43
44    def setUp(self):
45        self._mox = mox.Mox()
46
47        # Create a temporary pty pair for the ATTransceiver constructor
48        master, slave = os.openpty()
49
50        self._modem_conf = modem_configuration.ModemConfiguration()
51        self._at_transceiver = at_transceiver.ATTransceiver(slave,
52                                                            self._modem_conf,
53                                                            slave)
54
55        # Now replace internal objects in _at_transceiver with mocks
56        self._at_transceiver._modem_response_timeout_milliseconds = 0
57        self._mock_modem_channel = self._mox.CreateMock(at_channel.ATChannel)
58        self._at_transceiver._modem_channel = self._mock_modem_channel
59        self._mock_mm_channel = self._mox.CreateMock(at_channel.ATChannel)
60        self._at_transceiver._mm_channel = self._mock_mm_channel
61        self._mock_task_loop = self._mox.CreateMock(task_loop.TaskLoop)
62        self._at_transceiver._task_loop = self._mock_task_loop
63
64        # Also empty out the internal maps, so that actual loaded configuration
65        # does not interfere with the test.
66        self._at_transceiver._at_to_wm_action_map = {}
67        self._at_transceiver._wm_response_to_at_map = {}
68
69
70class ATTransceiverCommonTestCase(ATTransceiverTestCase):
71    """
72    Tests common to all three modes of ATTransceiver.
73
74    """
75
76    def test_successful_mode_selection(self):
77        """
78        Test that all modes can be selected, when both channels are provided.
79
80        """
81        self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
82        self.assertEqual(self._at_transceiver.mode,
83                         at_transceiver.ATTransceiverMode.WARDMODEM)
84        self._at_transceiver.mode = (
85                at_transceiver.ATTransceiverMode.PASS_THROUGH)
86        self.assertEqual(self._at_transceiver.mode,
87                         at_transceiver.ATTransceiverMode.PASS_THROUGH)
88        self._at_transceiver.mode = (
89               at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
90        self.assertEqual(self._at_transceiver.mode,
91                         at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
92
93    def test_unsuccessful_mode_selection(self):
94        """
95        Test that only WARDMODEM mode can be selected if the modem channel is
96        missing.
97
98        """
99        self._at_transceiver._modem_channel = None
100        self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
101        self.assertEqual(self._at_transceiver.mode,
102                         at_transceiver.ATTransceiverMode.WARDMODEM)
103        self._at_transceiver.mode = (
104                at_transceiver.ATTransceiverMode.PASS_THROUGH)
105        self.assertEqual(self._at_transceiver.mode,
106                         at_transceiver.ATTransceiverMode.WARDMODEM)
107        self._at_transceiver.mode = (
108               at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
109        self.assertEqual(self._at_transceiver.mode,
110                         at_transceiver.ATTransceiverMode.WARDMODEM)
111
112
113    def test_update_at_to_wm_action_map(self):
114        """
115        Test that _at_to_wm_action_map is updated correctly under different
116        scenarios.
117
118        """
119        # The diffs if this test fails can be rather long.
120        self.maxDiff = None
121        self._at_transceiver._at_to_wm_action_map = {}
122
123        # Test initialization
124        raw_map = {'AT1=': ('STATE_MACHINE1', 'function1'),
125                   'AT2=1,2': ('STATE_MACHINE2', 'function2'),
126                   'AT3=*,care,do': ('STATE_MACHINE3', 'function3', (0, 1)),
127                   'AT4?': ('STATE_MACHINE4', 'function4'),
128                   'AT5=': ('STATE_MACHINE5', 'function5', ()),
129                   'AT5=*': ('STATE_MACHINE6', 'function6')}
130        parsed_map = {'AT1=': {(): ('STATE_MACHINE1', 'function1', ())},
131                      'AT2=': {('1','2'): ('STATE_MACHINE2', 'function2', ())},
132                      'AT3=': {('*','care','do'): ('STATE_MACHINE3',
133                                                   'function3', (0, 1))},
134                      'AT4?': {(): ('STATE_MACHINE4', 'function4', ())},
135                      'AT5=': {(): ('STATE_MACHINE5', 'function5', ()),
136                               ('*',): ('STATE_MACHINE6', 'function6', ())}}
137
138        self._at_transceiver._update_at_to_wm_action_map(raw_map)
139        self.assertEqual(parsed_map, self._at_transceiver._at_to_wm_action_map)
140
141        # Test update
142        raw_good_update = {'AT1=': ('STATE_MACHINE7', 'function7'),
143                           'AT5=2': ('STATE_MACHINE8', 'function8', 0),
144                           'AT6?': ('STATE_MACHINE9', 'function9')}
145        parsed_map = {'AT1=': {(): ('STATE_MACHINE7', 'function7', ())},
146                      'AT2=': {('1','2'): ('STATE_MACHINE2', 'function2', ())},
147                      'AT3=': {('*','care','do'): ('STATE_MACHINE3',
148                                                   'function3', (0, 1))},
149                      'AT4?': {(): ('STATE_MACHINE4', 'function4', ())},
150                      'AT5=': {(): ('STATE_MACHINE5', 'function5', ()),
151                               ('*',): ('STATE_MACHINE6', 'function6', ()),
152                               ('2',): ('STATE_MACHINE8', 'function8', (0,))},
153                      'AT6?': {(): ('STATE_MACHINE9', 'function9', ())}}
154        self._at_transceiver._update_at_to_wm_action_map(raw_good_update)
155        self.assertEqual(parsed_map, self._at_transceiver._at_to_wm_action_map)
156
157
158    def test_find_wardmodem_action_for_at(self):
159        """
160        Setup _at_to_wm_action_map in the test and then test whether we can find
161        actions for AT commands off of that map.
162
163        """
164        raw_map = {'AT1=': ('STATE_MACHINE1', 'function1'),
165                   'AT2=1,2': ('STATE_MACHINE2', 'function2'),
166                   'AT3=*,b,c': ('STATE_MACHINE3', 'function3', (0, 1)),
167                   'AT4?': ('STATE_MACHINE4', 'function4'),
168                   'AT5=': ('STATE_MACHINE5', 'function5', ()),
169                   'AT5=*': ('STATE_MACHINE6', 'function6')}
170        self._at_transceiver._update_at_to_wm_action_map(raw_map)
171
172        self.assertEqual(
173                ('STATE_MACHINE1', 'function1', ()),
174                self._at_transceiver._find_wardmodem_action_for_at('AT1='))
175        self.assertEqual(
176                ('STATE_MACHINE2', 'function2', ()),
177                self._at_transceiver._find_wardmodem_action_for_at('AT2=1,2'))
178        self.assertEqual(
179                ('STATE_MACHINE3', 'function3', ('a','b')),
180                self._at_transceiver._find_wardmodem_action_for_at('AT3=a,b,c'))
181        self.assertEqual(
182                ('STATE_MACHINE3', 'function3', ('','b')),
183                self._at_transceiver._find_wardmodem_action_for_at('AT3=,b,c'))
184        self.assertEqual(
185                ('STATE_MACHINE5', 'function5', ()),
186                self._at_transceiver._find_wardmodem_action_for_at('AT5='))
187        self.assertEqual(
188                ('STATE_MACHINE6', 'function6', ()),
189                self._at_transceiver._find_wardmodem_action_for_at('AT5=s'))
190        # Unsuccessful cases
191        self.assertRaises(
192                wme.ATTransceiverException,
193                self._at_transceiver._find_wardmodem_action_for_at,
194                'DOESNOTEXIST')
195
196
197    def test_find_wardmodem_action_for_at_returns_fallback(self):
198        """
199        Test that when a fallback machine is setup, and unmatched AT command is
200        forwarded to this machine.
201
202        """
203        mock_test_machine = self._mox.CreateMock(self.TestMachine)
204        mock_test_machine.get_well_known_name().MultipleTimes().AndReturn(
205                'FALLBACK_MACHINE')
206        self._mox.ReplayAll()
207        self._at_transceiver.register_state_machine(mock_test_machine)
208        self._at_transceiver.register_fallback_state_machine(
209                mock_test_machine.get_well_known_name(),
210                'act_on')
211        self.assertEqual(
212                ('FALLBACK_MACHINE', 'act_on', ('DOESNOTEXIST',)),
213                self._at_transceiver._find_wardmodem_action_for_at(
214                        'DOESNOTEXIST'))
215        self._mox.VerifyAll()
216
217
218    def test_post_wardmodem_request(self):
219        """
220        Test that a wardmodem request can be posted successfully end-to-end.
221
222        """
223        raw_map = {'AT=*': ('TestMachine', 'test_function', 0)}
224        arg = 'fake_arg'
225        command = 'AT=' + arg
226        mock_test_machine = self._mox.CreateMock(self.TestMachine)
227        self._at_transceiver._update_at_to_wm_action_map(raw_map)
228        mock_test_machine.get_well_known_name().AndReturn('TestMachine')
229        self._mock_task_loop.post_task(
230                self._at_transceiver._execute_state_machine_function,
231                command, mox.IgnoreArg(), mock_test_machine.test_function,
232                arg)
233
234        self._mox.ReplayAll()
235        self._at_transceiver.register_state_machine(mock_test_machine)
236        self._at_transceiver._post_wardmodem_request(command)
237        self._mox.VerifyAll()
238
239
240    def test_update_wm_response_to_at_map(self):
241        """
242        Test that the wm_response_to_at_map is correctly updated.
243
244        """
245        raw_map = {'some_function': 'AT=some_function',
246                   'some_other_function': 'AT=some_other_function'}
247        self._at_transceiver._update_wm_response_to_at_map(raw_map)
248        self.assertEqual(raw_map,
249                         self._at_transceiver._wm_response_to_at_map)
250
251        raw_map = {'some_other_function': 'AT=overwritten_function',
252                   'some_new_function': 'AT=this_is_new_too'}
253        updated_map = {'some_function': 'AT=some_function',
254                       'some_other_function': 'AT=overwritten_function',
255                       'some_new_function': 'AT=this_is_new_too'}
256        self._at_transceiver._update_wm_response_to_at_map(raw_map)
257        self.assertEqual(updated_map,
258                         self._at_transceiver._wm_response_to_at_map)
259
260
261    def test_construct_at_response(self):
262        """
263        Test that construct_at_response correctly replaces by actual arguments.
264
265        """
266        self.assertEqual(
267                'AT=arg1,some,arg2',
268                self._at_transceiver._construct_at_response(
269                        'AT=*,some,*', 'arg1','arg2'))
270        self.assertEqual(
271                'AT=1,some,thing',
272                self._at_transceiver._construct_at_response(
273                        'AT=*,some,thing', 1))
274        self.assertEqual(
275                'AT=some,other,thing',
276                self._at_transceiver._construct_at_response(
277                        'AT=some,other,thing'))
278        self.assertEqual(
279                'AT=needsnone',
280                self._at_transceiver._construct_at_response(
281                        'AT=needsnone', 'butonegiven'))
282        # Unsuccessful cases
283        self.assertRaises(
284                wme.ATTransceiverException,
285                self._at_transceiver._construct_at_response,
286                'AT=*,needstwo,*', 'onlyonegiven')
287
288
289    def test_process_wardmodem_response(self):
290        """
291        A basic test for process_wardmodem_response.
292
293        """
294        self._mox.StubOutWithMock(self._at_transceiver,
295                             '_process_wardmodem_at_command')
296        raw_map = {'func1': 'AT=*,given,*',
297                   'func2': 'AT=nothing,needed'}
298        self._at_transceiver._update_wm_response_to_at_map(raw_map)
299
300        self._at_transceiver._process_wardmodem_at_command('AT=a,given,2')
301        self._at_transceiver._process_wardmodem_at_command('AT=nothing,needed')
302
303        self._mox.ReplayAll()
304        self._at_transceiver.process_wardmodem_response('func1','a',2)
305        self._at_transceiver.process_wardmodem_response('func2')
306        self._mox.UnsetStubs()
307        self._mox.VerifyAll()
308
309
310class ATTransceiverWardModemTestCase(ATTransceiverTestCase):
311    """
312    Test ATTransceiver class in the WARDMODEM mode.
313
314    """
315
316    def setUp(self):
317        super(ATTransceiverWardModemTestCase, self).setUp()
318        self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
319
320
321    def test_wardmodem_at_command(self):
322        """
323        Test the case when AT command is received from wardmodem.
324
325        """
326        at_command = 'AT+commmmmmmmmand'
327        self._mock_mm_channel.send(at_command)
328
329        self._mox.ReplayAll()
330        self._at_transceiver._process_wardmodem_at_command(at_command)
331        self._mox.VerifyAll()
332
333
334    def test_mm_at_command(self):
335        """
336        Test the case when AT command is received from modem manager.
337
338        """
339        at_command = 'AT+commmmmmmmmand'
340        self._mox.StubOutWithMock(self._at_transceiver,
341                                  '_post_wardmodem_request')
342
343        self._at_transceiver._post_wardmodem_request(at_command)
344
345        self._mox.ReplayAll()
346        self._at_transceiver._process_mm_at_command(at_command)
347        self._mox.UnsetStubs()
348        self._mox.VerifyAll()
349
350
351class ATTransceiverPassThroughTestCase(ATTransceiverTestCase):
352    """
353    Test ATTransceiver class in the PASS_THROUGH mode.
354
355    """
356
357    def setUp(self):
358        super(ATTransceiverPassThroughTestCase, self).setUp()
359        self._at_transceiver.mode = (
360                at_transceiver.ATTransceiverMode.PASS_THROUGH)
361
362
363    def test_modem_at_command(self):
364        """
365        Test the case when AT command received from physical modem.
366
367        """
368        at_command = 'AT+commmmmmmmmand'
369        self._mock_mm_channel.send(at_command)
370
371        self._mox.ReplayAll()
372        self._at_transceiver._process_modem_at_command(at_command)
373        self._mox.VerifyAll()
374
375
376    def test_mm_at_command(self):
377        """
378        Test the case when AT command is received from modem manager.
379
380        """
381        at_command = 'AT+commmmmmmmmand'
382        self._mock_modem_channel.send(at_command)
383
384        self._mox.ReplayAll()
385        self._at_transceiver._process_mm_at_command(at_command)
386        self._mox.VerifyAll()
387
388
389class ATTransceiverSplitVerifyTestCase(ATTransceiverTestCase):
390    """
391    Test ATTransceiver class in the SPLIT_VERIFY mode.
392
393    """
394
395    def setUp(self):
396        super(ATTransceiverSplitVerifyTestCase, self).setUp()
397        self._at_transceiver.mode = (
398                at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
399
400
401    def test_mm_at_command(self):
402        """
403        Test that that incoming modem manager command is multiplexed to
404        wardmodem and physical modem.
405
406        """
407        at_command = 'AT+commmmmmmmmand'
408        self._mox.StubOutWithMock(self._at_transceiver,
409                                  '_post_wardmodem_request')
410        self._mock_modem_channel.send(at_command).InAnyOrder()
411        self._at_transceiver._post_wardmodem_request(at_command).InAnyOrder()
412
413        self._mox.ReplayAll()
414        self._at_transceiver._process_mm_at_command(at_command)
415        self._mox.UnsetStubs()
416        self._mox.VerifyAll()
417
418
419    def test_successful_single_at_response_modem_wardmodem(self):
420        """
421        Test the case when one AT response is received successfully.
422        In this case, physical modem command comes first.
423
424        """
425        at_command = 'AT+commmmmmmmmand'
426        self._mock_mm_channel.send(at_command)
427
428        self._mox.ReplayAll()
429        self._at_transceiver._process_modem_at_command(at_command)
430        self._at_transceiver._process_wardmodem_at_command(at_command)
431        self._mox.VerifyAll()
432
433
434    def test_successful_single_at_response_wardmodem_modem(self):
435        """
436        Test the case when one AT response is received successfully.
437        In this case, wardmodem command comes first.
438
439        """
440        at_command = 'AT+commmmmmmmmand'
441        task_id = 3
442        self._mock_task_loop.post_task_after_delay(
443                self._at_transceiver._modem_response_timed_out,
444                mox.IgnoreArg()).AndReturn(task_id)
445        self._mock_task_loop.cancel_posted_task(task_id)
446        self._mock_mm_channel.send(at_command)
447
448        self._mox.ReplayAll()
449        self._at_transceiver._process_wardmodem_at_command(at_command)
450        self._at_transceiver._process_modem_at_command(at_command)
451        self._mox.VerifyAll()
452
453    def test_mismatched_at_response(self):
454        """
455        Test the case when both responses arrive, but are not identical.
456
457        """
458        wardmodem_command = 'AT+wardmodem'
459        modem_command = 'AT+modem'
460        self._mox.StubOutWithMock(self._at_transceiver,
461                                  '_report_verification_failure')
462        self._at_transceiver._report_verification_failure(
463                self._at_transceiver.VERIFICATION_FAILED_MISMATCH,
464                modem_command,
465                wardmodem_command)
466        self._mock_mm_channel.send(wardmodem_command)
467
468        self._mox.ReplayAll()
469        self._at_transceiver._process_modem_at_command(modem_command)
470        self._at_transceiver._process_wardmodem_at_command(wardmodem_command)
471        self._mox.UnsetStubs()
472        self._mox.VerifyAll()
473
474
475    def test_modem_response_times_out(self):
476        """
477        Test the case when the physical modem fails to respond.
478
479        """
480        at_command = 'AT+commmmmmmmmand'
481        task_id = 3
482        self._mox.StubOutWithMock(self._at_transceiver,
483                                  '_report_verification_failure')
484
485        self._mock_task_loop.post_task_after_delay(
486                self._at_transceiver._modem_response_timed_out,
487                mox.IgnoreArg()).AndReturn(task_id)
488        self._at_transceiver._report_verification_failure(
489                self._at_transceiver.VERIFICATION_FAILED_TIME_OUT,
490                None,
491                at_command)
492        self._mock_mm_channel.send(at_command)
493
494        self._mox.ReplayAll()
495        self._at_transceiver._process_wardmodem_at_command(at_command)
496        self._at_transceiver._modem_response_timed_out()
497        self._mox.UnsetStubs()
498        self._mox.VerifyAll()
499
500
501    def test_multiple_successful_responses(self):
502        """
503        Test the case two wardmodem responses are queued, and then two matching
504        modem responses are received.
505
506        """
507        first_at_command = 'AT+first'
508        second_at_command = 'AT+second'
509        first_task_id = 3
510        second_task_id = 4
511
512        self._mock_task_loop.post_task_after_delay(
513                self._at_transceiver._modem_response_timed_out,
514                mox.IgnoreArg()).AndReturn(first_task_id)
515        self._mock_task_loop.cancel_posted_task(first_task_id)
516        self._mock_mm_channel.send(first_at_command)
517        self._mock_task_loop.post_task_after_delay(
518                self._at_transceiver._modem_response_timed_out,
519                mox.IgnoreArg()).AndReturn(second_task_id)
520        self._mock_task_loop.cancel_posted_task(second_task_id)
521        self._mock_mm_channel.send(second_at_command)
522
523        self._mox.ReplayAll()
524        self._at_transceiver._process_wardmodem_at_command(first_at_command)
525        self._at_transceiver._process_wardmodem_at_command(second_at_command)
526        self._at_transceiver._process_modem_at_command(first_at_command)
527        self._at_transceiver._process_modem_at_command(second_at_command)
528        self._mox.VerifyAll()
529
530
531if __name__ == '__main__':
532    logging.basicConfig(level=logging.DEBUG)
533    unittest.main()
534