1# Lint as: python2, python3 2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import json, logging, os, pwd, shutil, subprocess, time 7 8import dbus 9 10import common 11from autotest_lib.client.bin import utils 12from autotest_lib.client.common_lib import error 13from autotest_lib.client.cros import semiauto_framework 14from autotest_lib.client.cros.power import sys_power 15 16_USER_TIMEOUT_TIME = 321 # Seconds a tester has to respond to prompts 17_DEVICE_TIMEOUT_TIME = 321 # Seconds a tester has to pair or connect device 18_ADAPTER_INTERFACE = 'org.bluez.Adapter1' # Name of adapter in DBus interface 19_DEVICE_INTERFACE = 'org.bluez.Device1' # Name of a device in DBus interface 20_TIME_FORMAT = '%d %b %Y %H:%M:%S' # Human-readable time format for logs 21_SECTION_BREAK = '='*75 22 23 24class BluetoothSemiAutoHelper(semiauto_framework.semiauto_test): 25 """Generic Bluetooth SemiAutoTest. 26 27 Contains functions needed to implement an actual Bluetooth SemiAutoTest, 28 such as accessing the state of Bluetooth adapter/devices via dbus, 29 opening dialogs with tester via Telemetry browser, and getting log data. 30 """ 31 version = 1 32 33 # Boards without Bluetooth support. 34 _INVALID_BOARDS = ['x86-alex', 'x86-alex_he', 'lumpy'] 35 36 def _err(self, message): 37 """Raise error after first collecting more information. 38 39 @param message: error message to raise and add to logs. 40 41 """ 42 self.collect_logs('ERROR HAS OCCURED: %s' % message) 43 raise error.TestError(message) 44 45 def supports_bluetooth(self): 46 """Return True if this device has Bluetooth capabilities; else False.""" 47 device = utils.get_board() 48 if device in self._INVALID_BOARDS: 49 logging.info('%s does not have Bluetooth.', device) 50 return False 51 return True 52 53 def _get_objects(self): 54 """Return the managed objects for this chromebook.""" 55 manager = dbus.Interface( 56 self._bus.get_object('org.bluez', '/'), 57 dbus_interface='org.freedesktop.DBus.ObjectManager') 58 return manager.GetManagedObjects() 59 60 def _get_adapter_info(self): 61 """Return the adapter interface objects, or None if not found.""" 62 objects = self._get_objects() 63 for path, interfaces in objects.items(): 64 if _ADAPTER_INTERFACE in interfaces: 65 self._adapter_path = path 66 return interfaces[_ADAPTER_INTERFACE] 67 return None 68 69 def _get_device_info(self, addr): 70 """Return the device interface objects, or None if not found.""" 71 objects = self._get_objects() 72 for _, interfaces in objects.items(): 73 if _DEVICE_INTERFACE in interfaces: 74 if interfaces[_DEVICE_INTERFACE]['Address'] == addr: 75 return interfaces[_DEVICE_INTERFACE] 76 return None 77 78 def _verify_adapter_power(self, adapter_power_status): 79 """Return True/False if adapter power status matches given value.""" 80 info = self._get_adapter_info() 81 if not info: 82 self._err('No adapter found!') 83 return True if info['Powered'] == adapter_power_status else False 84 85 def _verify_device_connection(self, addr, paired_status=True, 86 connected_status=True): 87 """Return True/False if device statuses match given values.""" 88 def _check_info(): 89 info = self._get_device_info(addr) 90 if info: 91 if (info['Paired'] != paired_status or 92 info['Connected'] != connected_status): 93 return False 94 return True 95 # Return True if no entry was found for an unpaired device 96 return not paired_status and not connected_status 97 98 results = _check_info() 99 100 # To avoid spotting brief connections, sleep and check again. 101 if results: 102 time.sleep(0.5) 103 results = _check_info() 104 return results 105 106 def set_adapter_power(self, adapter_power_status): 107 """Set adapter power status to match given value via dbus call. 108 109 Block until the power is set. 110 111 @param adapter_power_status: True to turn adapter on; False for off. 112 113 """ 114 info = self._get_adapter_info() 115 if not info: 116 self._err('No adapter found!') 117 properties = dbus.Interface( 118 self._bus.get_object('org.bluez', self._adapter_path), 119 dbus_interface='org.freedesktop.DBus.Properties') 120 properties.Set(_ADAPTER_INTERFACE, 'Powered', adapter_power_status) 121 122 self.poll_adapter_power(adapter_power_status) 123 124 def poll_adapter_presence(self): 125 """Raise error if adapter is not found after some time.""" 126 complete = lambda: self._get_adapter_info() is not None 127 try: 128 utils.poll_for_condition( 129 condition=complete, timeout=15, sleep_interval=1) 130 except utils.TimeoutError: 131 self._err('No adapter found after polling!') 132 133 def poll_adapter_power(self, adapter_power_status=True): 134 """Wait until adapter power status matches given value. 135 136 @param adapter_power_status: True for adapter is on; False for off. 137 138 """ 139 complete = lambda: self._verify_adapter_power( 140 adapter_power_status=adapter_power_status) 141 adapter_str = 'ON' if adapter_power_status else 'OFF' 142 utils.poll_for_condition( 143 condition=complete, timeout=_DEVICE_TIMEOUT_TIME, 144 sleep_interval=1, 145 desc=('Timeout for Bluetooth Adapter to be %s' % adapter_str)) 146 147 def _poll_connection(self, addr, paired_status, connected_status): 148 """Wait until device statuses match given values.""" 149 paired_str = 'PAIRED' if paired_status else 'NOT PAIRED' 150 conn_str = 'CONNECTED' if connected_status else 'NOT CONNECTED' 151 message = 'Waiting for device %s to be %s and %s' % (addr, paired_str, 152 conn_str) 153 logging.info(message) 154 155 complete = lambda: self._verify_device_connection( 156 addr, paired_status=paired_status, 157 connected_status=connected_status) 158 utils.poll_for_condition( 159 condition=complete, timeout=_DEVICE_TIMEOUT_TIME, 160 sleep_interval=1, desc=('Timeout while %s' % message)) 161 162 def poll_connections(self, paired_status=True, connected_status=True): 163 """Wait until all Bluetooth devices have the given statues. 164 165 @param paired_status: True for device paired; False for unpaired. 166 @param connected_status: True for device connected; False for not. 167 168 """ 169 for addr in self._addrs: 170 self._poll_connection(addr, paired_status=paired_status, 171 connected_status=connected_status) 172 173 def login_and_open_browser(self): 174 """Log in to machine, open browser, and navigate to dialog template. 175 176 Assumes the existence of 'client/cros/audio/music.mp3' file, and will 177 fail if not found. 178 179 """ 180 # Open browser and interactive tab 181 self.login_and_open_interactive_tab() 182 183 # Find mounted home directory 184 user_home = None 185 for udir in os.listdir(os.path.join('/', 'home', 'user')): 186 d = os.path.join('/', 'home', 'user', udir) 187 if os.path.ismount(d): 188 user_home = d 189 if user_home is None: 190 raise error.TestError('Could not find mounted home directory') 191 192 # Setup Audio File 193 audio_dir = os.path.join(self.bindir, '..', '..', 'cros', 'audio') 194 loop_file = os.path.join(audio_dir, 'loop.html') 195 music_file = os.path.join(audio_dir, 'music.mp3') 196 dl_dir = os.path.join(user_home, 'Downloads') 197 self._added_loop_file = os.path.join(dl_dir, 'loop.html') 198 self._added_music_file = os.path.join(dl_dir, 'music.mp3') 199 shutil.copyfile(loop_file, self._added_loop_file) 200 shutil.copyfile(music_file, self._added_music_file) 201 uid = pwd.getpwnam('chronos').pw_uid 202 gid = pwd.getpwnam('chronos').pw_gid 203 os.chmod(self._added_loop_file, 0o755) 204 os.chmod(self._added_music_file, 0o755) 205 os.chown(self._added_loop_file, uid, gid) 206 os.chown(self._added_music_file, uid, gid) 207 208 # Open Test Dialog tab, Settings tab, and Audio file 209 self._settings_tab = self._browser.tabs.New() 210 self._settings_tab.Navigate('chrome://settings/search#Bluetooth') 211 music_tab = self._browser.tabs.New() 212 music_tab.Navigate('file:///home/chronos/user/Downloads/loop.html') 213 214 def ask_user(self, message): 215 """Ask the user a yes or no question in an open tab. 216 217 Reset dialog page to be a question (message param) with 'PASS' and 218 'FAIL' buttons. Wait for answer. If no, ask for more information. 219 220 @param message: string sent to the user via browswer interaction. 221 222 """ 223 logging.info('Asking user "%s"', message) 224 sandbox = 'SANDBOX:<input type="text"/>' 225 html = '<h3>%s</h3>%s' % (message, sandbox) 226 self.set_tab_with_buttons(html, buttons=['PASS', 'FAIL']) 227 228 # Intepret results. 229 result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME) 230 if result == 1: 231 # Ask for more information on error. 232 html='<h3>Please provide more info:</h3>' 233 self.set_tab_with_textbox(html) 234 235 # Get explanation of error, clear output, and raise error. 236 result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME) 237 self.clear_output() 238 self._err('Testing %s. "%s".' % (self._test_type, result)) 239 elif result != 0: 240 raise error.TestError('Bad dialog value: %s' % result) 241 logging.info('Answer was PASS') 242 243 # Clear user screen. 244 self.clear_output() 245 246 def tell_user(self, message): 247 """Tell the user the given message in an open tab. 248 249 @param message: the text string to be displayed. 250 251 """ 252 logging.info('Telling user "%s"', message) 253 html = '<h3>%s</h3>' % message 254 self.set_tab(html) 255 256 def check_working(self, message=None): 257 """Steps to check that all devices are functioning. 258 259 Ask user to connect all devices, verify connections, and ask for 260 user input if they are working. 261 262 @param message: string of text the user is asked. Defaults to asking 263 the user to connect all devices. 264 265 """ 266 if not message: 267 message = ('Please connect all devices.<br>(You may need to ' 268 'click mice, press keyboard keys, or use the ' 269 'Connect button in Settings.)') 270 self.tell_user(message) 271 self.poll_adapter_power(True) 272 self.poll_connections(paired_status=True, connected_status=True) 273 self.ask_user('Are all Bluetooth devices working?<br>' 274 'Is audio playing only through Bluetooth devices?<br>' 275 'Do onboard keyboard and trackpad work?') 276 277 def ask_not_working(self): 278 """Ask the user pre-defined message about NOT working.""" 279 self.ask_user('No Bluetooth devices work.<br>Audio is NOT playing ' 280 'through onboard speakers or wired headphones.') 281 282 def start_dump(self, message=''): 283 """Run btmon in subprocess. 284 285 Kill previous btmon (if needed) and start new one using current 286 test type as base filename. Dumps stored in results folder. 287 288 @param message: string of text added to top of log entry. 289 290 """ 291 if hasattr(self, '_dump') and self._dump: 292 self._dump.kill() 293 if not hasattr(self, '_test_type'): 294 self._test_type = 'test' 295 logging.info('Starting btmon') 296 filename = '%s_btmon' % self._test_type 297 path = os.path.join(self.resultsdir, filename) 298 with open(path, 'a') as f: 299 f.write('%s\n' % _SECTION_BREAK) 300 f.write('%s: Starting btmon\n' % time.strftime(_TIME_FORMAT)) 301 f.write('%s\n' % message) 302 f.flush() 303 btmon_path = '/usr/bin/btmon' 304 try: 305 self._dump = subprocess.Popen([btmon_path], stdout=f, 306 stderr=subprocess.PIPE) 307 except Exception as e: 308 raise error.TestError('btmon: %s' % e) 309 310 def collect_logs(self, message=''): 311 """Store results of dbus GetManagedObjects and hciconfig. 312 313 Use current test type as base filename. Stored in results folder. 314 315 @param message: string of text added to top of log entry. 316 317 """ 318 logging.info('Collecting dbus info') 319 if not hasattr(self, '_test_type'): 320 self._test_type = 'test' 321 filename = '%s_dbus' % self._test_type 322 path = os.path.join(self.resultsdir, filename) 323 with open(path, 'a') as f: 324 f.write('%s\n' % _SECTION_BREAK) 325 f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message)) 326 f.write(json.dumps(list(self._get_objects().items()), indent=2)) 327 f.write('\n') 328 329 logging.info('Collecting hciconfig info') 330 filename = '%s_hciconfig' % self._test_type 331 path = os.path.join(self.resultsdir, filename) 332 with open(path, 'a') as f: 333 f.write('%s\n' % _SECTION_BREAK) 334 f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message)) 335 f.flush() 336 hciconfig_path = '/usr/bin/hciconfig' 337 try: 338 subprocess.check_call([hciconfig_path, '-a'], stdout=f) 339 except Exception as e: 340 raise error.TestError('hciconfig: %s' % e) 341 342 def os_idle_time_set(self, reset=False): 343 """Function to set short idle time or to reset to normal. 344 345 Not using sys_power so that user can use Bluetooth to wake machine. 346 347 @param reset: true to reset to normal idle time, false for short. 348 349 """ 350 powerd_path = '/usr/bin/set_short_powerd_timeouts' 351 flag = '--reset' if reset else '' 352 try: 353 subprocess.check_call([powerd_path, flag]) 354 except Exception as e: 355 raise error.TestError('idle cmd: %s' % e) 356 357 def os_suspend(self): 358 """Function to suspend ChromeOS using sys_power.""" 359 sys_power.do_suspend(5) 360 361 # Sleep 362 time.sleep(5) 363 364 def initialize(self): 365 self._bus = dbus.SystemBus() 366 367 def warmup(self, addrs='', test_phase='client', close_browser=True): 368 """Warmup setting paramters for semi-automated Bluetooth Test. 369 370 Actual test steps are implemened in run_once() function. 371 372 @param: addrs: list of MAC address of Bluetooth devices under test. 373 @param: test_phase: for use by server side tests to, for example, call 374 the same test before and after a reboot. 375 @param: close_browser: True if client side test should close browser 376 at end of test. 377 378 """ 379 self.login_and_open_browser() 380 381 self._addrs = addrs 382 self._test_type = 'start' 383 self._test_phase = test_phase 384 self._will_close_browser = close_browser 385 386 def cleanup(self): 387 """Cleanup of various files/processes opened during test. 388 389 Closes running btmon, closes browser (if asked to at start), and 390 deletes files added during test. 391 392 """ 393 if hasattr(self, '_dump'): 394 self._dump.kill() 395 if hasattr(self, '_will_close_browser') and self._will_close_browser: 396 self.close_browser() 397 if (hasattr(self, '_added_loop_file') 398 and os.path.exists(self._added_loop_file)): 399 os.remove(self._added_loop_file) 400 if (hasattr(self, '_added_music_file') 401 and os.path.exists(self._added_music_file)): 402 os.remove(self._added_music_file) 403