1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import gobject 6import logging 7import time 8 9from autotest_lib.client.bin import test 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.cros.cellular import modem_utils 12from autotest_lib.client.cros.mainloop import ExceptionForward 13from autotest_lib.client.cros.mainloop import GenericTesterMainLoop 14from autotest_lib.client.cros.networking import shill_proxy 15 16DEFAULT_TEST_TIMEOUT_S = 600 17 18 19class DisableTester(GenericTesterMainLoop): 20 """Base class containing main test logic.""" 21 def __init__(self, *args, **kwargs): 22 super(DisableTester, self).__init__(*args, **kwargs) 23 24 @ExceptionForward 25 def perform_one_test(self): 26 """Called by GenericMainTesterMainLoop to execute the test.""" 27 self._configure() 28 disable_delay_ms = ( 29 self.test_kwargs.get('delay_before_disable_ms', 0) + 30 self.test.iteration * 31 self.test_kwargs.get('disable_delay_per_iteration_ms', 0)) 32 gobject.timeout_add(disable_delay_ms, self._start_disable) 33 self._start_test() 34 35 @ExceptionForward 36 def _connect_success_handler(self, *ignored_args): 37 logging.info('connect succeeded') 38 self.requirement_completed('connect') 39 40 @ExceptionForward 41 def _connect_error_handler(self, e): 42 # We disabled while connecting; error is OK 43 logging.info('connect errored: %s', e) 44 self.requirement_completed('connect') 45 46 @ExceptionForward 47 def _start_disable(self): 48 logging.info('disabling') 49 self.disable_start = time.time() 50 self._enable(False) 51 52 @ExceptionForward 53 def _disable_success_handler(self): 54 disable_elapsed = time.time() - self.disable_start 55 self.requirement_completed('disable') 56 57 @ExceptionForward 58 def _get_status_success_handler(self, status): 59 logging.info('Got status') 60 self.requirement_completed('get_status', warn_if_already_completed=False) 61 if self.status_delay_ms: 62 gobject.timeout_add(self.status_delay_ms, self._start_get_status) 63 64 def after_main_loop(self): 65 """Called by GenericTesterMainLoop after the main loop has exited.""" 66 enabled = self._enabled() 67 logging.info('Modem enabled: %s', enabled) 68 # Will return happily if no Gobi present 69 modem_utils.ClearGobiModemFaultInjection() 70 71 72class ShillDisableTester(DisableTester): 73 """Tests that disable-while-connecting works at the shill level. 74 Expected control flow: 75 76 * self._configure() called; registers self._disable_property_changed 77 to be called when device is en/disabled 78 79 * Parent class sets a timer that calls self._enable(False) when it expires. 80 81 * _start_test calls _start_connect() which sends a connect request to 82 the device. 83 84 * we wait for the modem to power off, at which point 85 _disable_property_changed (registered above) will get called 86 87 * _disable_property_changed() completes the 'disable' requirement, 88 and we're done. 89 90 """ 91 def __init__(self, *args, **kwargs): 92 super(ShillDisableTester, self).__init__(*args, **kwargs) 93 94 def _disable_property_changed(self, property, value, *args, **kwargs): 95 self._disable_success_handler() 96 97 def _start_test(self): 98 # We would love to add requirements based on connect, but in many 99 # scenarios, there is no observable response to a cancelled 100 # connect: We issue a connect, it returns instantly to let us know 101 # that the connect has started, but then the disable takes effect 102 # and the connect fails. We don't get a state change because no 103 # state change has happened: the modem never got to a different 104 # state before we cancelled 105 self.remaining_requirements = set(['disable']) 106 self._start_connect() 107 108 def _configure(self): 109 self.cellular_device = \ 110 self.test.test_env.shill.find_cellular_device_object() 111 if self.cellular_device is None: 112 raise error.TestError("Could not find cellular device") 113 114 self.cellular_service = \ 115 self.test.test_env.shill.find_cellular_service_object() 116 117 self.test.test_env.bus.add_signal_receiver( 118 self.dispatch_property_changed, 119 signal_name='PropertyChanged', 120 dbus_interface=self.cellular_device.dbus_interface, 121 path=self.cellular_device.object_path) 122 123 @ExceptionForward 124 def _expect_einprogress_handler(self, e): 125 pass 126 127 def _enable(self, value): 128 self.property_changed_actions['Powered'] = self._disable_property_changed 129 130 if value: 131 self.cellular_device.Enable( 132 reply_handler=self.ignore_handler, 133 error_handler=self._expect_einprogress_handler) 134 else: 135 self.cellular_device.Disable( 136 reply_handler=self.ignore_handler, 137 error_handler=self._expect_einprogress_handler) 138 139 @ExceptionForward 140 def _start_connect(self): 141 logging.info('connecting') 142 143 def _log_connect_event(property, value, *ignored_args): 144 logging.info('%s property changed: %s', property, value) 145 146 self.property_changed_actions['Connected'] = _log_connect_event 147 148 # Contrary to documentation, Connect just returns when it has 149 # fired off the lower-level dbus messages. So a success means 150 # nothing to us. But a failure means it didn't even try. 151 self.cellular_service.Connect( 152 reply_handler=self.ignore_handler, 153 error_handler=self.build_error_handler('Connect')) 154 155 def _enabled(self): 156 return self.test.test_env.shill.get_dbus_property( 157 self.cellular_device, 158 shill_proxy.ShillProxy.DEVICE_PROPERTY_POWERED) 159 160 161class ModemDisableTester(DisableTester): 162 """Tests that disable-while-connecting works at the modem-manager level. 163 164 Expected control flow: 165 166 * _configure() is called. 167 168 * Parent class sets a timer that calls self._enable(False) when it 169 expires. 170 171 * _start_test calls _start_connect() which sends a connect request to 172 the device, also sets a timer that calls GetStatus on the modem. 173 174 * wait for all three (connect, disable, get_status) to complete. 175 176 """ 177 def __init__(self, *args, **kwargs): 178 super(ModemDisableTester, self).__init__(*args, **kwargs) 179 180 def _is_gobi(self): 181 return 'Gobi' in self.test.test_env.modem.path 182 183 def _start_test(self): 184 self.remaining_requirements = set(['connect', 'disable']) 185 186 # Only cromo/gobi-cromo-plugin maintain the invariant that GetStatus 187 # will always succeed, so we only check it if the modem is a Gobi. 188 if self._is_gobi(): 189 self.remaining_requirements.add('get_status') 190 self.status_delay_ms = self.test_kwargs.get('status_delay_ms', 200) 191 gobject.timeout_add(self.status_delay_ms, self._start_get_status) 192 193 self._start_connect() 194 195 def _configure(self): 196 self.simple_modem = self.test.test_env.modem.SimpleModem() 197 198 logging.info('Modem path: %s', self.test.test_env.modem.path) 199 200 if self._is_gobi(): 201 self._configure_gobi() 202 else: 203 self._configure_non_gobi() 204 205 service = self.test.test_env.shill.wait_for_cellular_service_object() 206 if not service: 207 raise error.TestError('Modem failed to register with the network after ' 208 're-enabling.') 209 210 def _configure_gobi(self): 211 gobi_modem = self.test.test_env.modem.GobiModem() 212 213 if 'async_connect_sleep_ms' in self.test_kwargs: 214 sleep_ms = self.test_kwargs.get('async_connect_sleep_ms', 0) 215 logging.info('Sleeping %d ms before connect', sleep_ms) 216 gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms) 217 218 if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs: 219 logging.info('Injecting QMI failure') 220 gobi_modem.InjectFault('ConnectFailsWithErrorSendingQmiRequest', 1) 221 222 def _configure_non_gobi(self): 223 # Check to make sure no Gobi-specific arguments were specified. 224 if 'async_connect_sleep_ms' in self.test_kwargs: 225 raise error.TestError('async_connect_sleep_ms on non-Gobi modem') 226 if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs: 227 raise error.TestError( 228 'connect_fails_with_error_sending_qmi_request on non-Gobi modem') 229 230 @ExceptionForward 231 def _start_connect(self): 232 logging.info('connecting') 233 234 retval = self.simple_modem.Connect( 235 {}, 236 reply_handler=self._connect_success_handler, 237 error_handler=self._connect_error_handler) 238 logging.info('connect call made. retval = %s', retval) 239 240 241 @ExceptionForward 242 def _start_get_status(self): 243 # Keep on calling get_status to make sure it works at all times 244 self.simple_modem.GetStatus( 245 reply_handler=self._get_status_success_handler, 246 error_handler=self.build_error_handler('GetStatus')) 247 248 def _enabled(self): 249 return self.test.test_env.modem.GetModemProperties().get('Enabled', -1) 250 251 def _enable(self, value): 252 self.test.test_env.modem.Enable( 253 value, 254 reply_handler=self._disable_success_handler, 255 error_handler=self.build_error_handler('Enable')) 256 257 258class cellular_DisableWhileConnecting(test.test): 259 """Check that the modem can handle a disconnect while connecting.""" 260 version = 1 261 262 def run_once(self, test_env, **kwargs): 263 self.test_env = test_env 264 timeout_s = kwargs.get('timeout_s', DEFAULT_TEST_TIMEOUT_S) 265 gobject_main_loop = gobject.MainLoop() 266 267 with test_env: 268 logging.info('Shill-level test') 269 shill_level_test = ShillDisableTester(self, 270 gobject_main_loop, 271 timeout_s=timeout_s) 272 shill_level_test.run(**kwargs) 273 274 with test_env: 275 try: 276 logging.info('Modem-level test') 277 modem_level_test = ModemDisableTester(self, 278 gobject_main_loop, 279 timeout_s=timeout_s) 280 modem_level_test.run(**kwargs) 281 finally: 282 modem_utils.ClearGobiModemFaultInjection() 283