1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import json, logging, os, pwd, shutil, subprocess, time 6 7import dbus 8 9from autotest_lib.client.bin import utils 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.cros import semiauto_framework 12from autotest_lib.client.cros.power import sys_power 13 14_USER_TIMEOUT_TIME = 321 # Seconds a tester has to respond to prompts 15_DEVICE_TIMEOUT_TIME = 321 # Seconds a tester has to pair or connect device 16_ADAPTER_INTERFACE = 'org.bluez.Adapter1' # Name of adapter in DBus interface 17_DEVICE_INTERFACE = 'org.bluez.Device1' # Name of a device in DBus interface 18_TIME_FORMAT = '%d %b %Y %H:%M:%S' # Human-readable time format for logs 19_SECTION_BREAK = '='*75 20 21 22class BluetoothSemiAutoHelper(semiauto_framework.semiauto_test): 23 """Generic Bluetooth SemiAutoTest. 24 25 Contains functions needed to implement an actual Bluetooth SemiAutoTest, 26 such as accessing the state of Bluetooth adapter/devices via dbus, 27 opening dialogs with tester via Telemetry browser, and getting log data. 28 """ 29 version = 1 30 31 # Boards without Bluetooth support. 32 _INVALID_BOARDS = ['x86-alex', 'x86-alex_he', 'lumpy'] 33 34 def _err(self, message): 35 """Raise error after first collecting more information. 36 37 @param message: error message to raise and add to logs. 38 39 """ 40 self.collect_logs('ERROR HAS OCCURED: %s' % message) 41 raise error.TestError(message) 42 43 def supports_bluetooth(self): 44 """Return True if this device has Bluetooth capabilities; else False.""" 45 device = utils.get_board() 46 if device in self._INVALID_BOARDS: 47 logging.info('%s does not have Bluetooth.', device) 48 return False 49 return True 50 51 def _get_objects(self): 52 """Return the managed objects for this chromebook.""" 53 manager = dbus.Interface( 54 self._bus.get_object('org.bluez', '/'), 55 dbus_interface='org.freedesktop.DBus.ObjectManager') 56 return manager.GetManagedObjects() 57 58 def _get_adapter_info(self): 59 """Return the adapter interface objects, or None if not found.""" 60 objects = self._get_objects() 61 for path, interfaces in objects.items(): 62 if _ADAPTER_INTERFACE in interfaces: 63 self._adapter_path = path 64 return interfaces[_ADAPTER_INTERFACE] 65 return None 66 67 def _get_device_info(self, addr): 68 """Return the device interface objects, or None if not found.""" 69 objects = self._get_objects() 70 for _, interfaces in objects.items(): 71 if _DEVICE_INTERFACE in interfaces: 72 if interfaces[_DEVICE_INTERFACE]['Address'] == addr: 73 return interfaces[_DEVICE_INTERFACE] 74 return None 75 76 def _verify_adapter_power(self, adapter_power_status): 77 """Return True/False if adapter power status matches given value.""" 78 info = self._get_adapter_info() 79 if not info: 80 self._err('No adapter found!') 81 return True if info['Powered'] == adapter_power_status else False 82 83 def _verify_device_connection(self, addr, paired_status=True, 84 connected_status=True): 85 """Return True/False if device statuses match given values.""" 86 def _check_info(): 87 info = self._get_device_info(addr) 88 if info: 89 if (info['Paired'] != paired_status or 90 info['Connected'] != connected_status): 91 return False 92 return True 93 # Return True if no entry was found for an unpaired device 94 return not paired_status and not connected_status 95 96 results = _check_info() 97 98 # To avoid spotting brief connections, sleep and check again. 99 if results: 100 time.sleep(0.5) 101 results = _check_info() 102 return results 103 104 def set_adapter_power(self, adapter_power_status): 105 """Set adapter power status to match given value via dbus call. 106 107 Block until the power is set. 108 109 @param adapter_power_status: True to turn adapter on; False for off. 110 111 """ 112 info = self._get_adapter_info() 113 if not info: 114 self._err('No adapter found!') 115 properties = dbus.Interface( 116 self._bus.get_object('org.bluez', self._adapter_path), 117 dbus_interface='org.freedesktop.DBus.Properties') 118 properties.Set(_ADAPTER_INTERFACE, 'Powered', adapter_power_status) 119 120 self.poll_adapter_power(adapter_power_status) 121 122 def poll_adapter_presence(self): 123 """Raise error if adapter is not found after some time.""" 124 complete = lambda: self._get_adapter_info() is not None 125 try: 126 utils.poll_for_condition( 127 condition=complete, timeout=15, sleep_interval=1) 128 except utils.TimeoutError: 129 self._err('No adapter found after polling!') 130 131 def poll_adapter_power(self, adapter_power_status=True): 132 """Wait until adapter power status matches given value. 133 134 @param adapter_power_status: True for adapter is on; False for off. 135 136 """ 137 complete = lambda: self._verify_adapter_power( 138 adapter_power_status=adapter_power_status) 139 adapter_str = 'ON' if adapter_power_status else 'OFF' 140 utils.poll_for_condition( 141 condition=complete, timeout=_DEVICE_TIMEOUT_TIME, 142 sleep_interval=1, 143 desc=('Timeout for Bluetooth Adapter to be %s' % adapter_str)) 144 145 def _poll_connection(self, addr, paired_status, connected_status): 146 """Wait until device statuses match given values.""" 147 paired_str = 'PAIRED' if paired_status else 'NOT PAIRED' 148 conn_str = 'CONNECTED' if connected_status else 'NOT CONNECTED' 149 message = 'Waiting for device %s to be %s and %s' % (addr, paired_str, 150 conn_str) 151 logging.info(message) 152 153 complete = lambda: self._verify_device_connection( 154 addr, paired_status=paired_status, 155 connected_status=connected_status) 156 utils.poll_for_condition( 157 condition=complete, timeout=_DEVICE_TIMEOUT_TIME, 158 sleep_interval=1, desc=('Timeout while %s' % message)) 159 160 def poll_connections(self, paired_status=True, connected_status=True): 161 """Wait until all Bluetooth devices have the given statues. 162 163 @param paired_status: True for device paired; False for unpaired. 164 @param connected_status: True for device connected; False for not. 165 166 """ 167 for addr in self._addrs: 168 self._poll_connection(addr, paired_status=paired_status, 169 connected_status=connected_status) 170 171 def login_and_open_browser(self): 172 """Log in to machine, open browser, and navigate to dialog template. 173 174 Assumes the existence of 'client/cros/audio/music.mp3' file, and will 175 fail if not found. 176 177 """ 178 # Open browser and interactive tab 179 self.login_and_open_interactive_tab() 180 181 # Find mounted home directory 182 user_home = None 183 for udir in os.listdir(os.path.join('/', 'home', 'user')): 184 d = os.path.join('/', 'home', 'user', udir) 185 if os.path.ismount(d): 186 user_home = d 187 if user_home is None: 188 raise error.TestError('Could not find mounted home directory') 189 190 # Setup Audio File 191 audio_dir = os.path.join(self.bindir, '..', '..', 'cros', 'audio') 192 loop_file = os.path.join(audio_dir, 'loop.html') 193 music_file = os.path.join(audio_dir, 'music.mp3') 194 dl_dir = os.path.join(user_home, 'Downloads') 195 self._added_loop_file = os.path.join(dl_dir, 'loop.html') 196 self._added_music_file = os.path.join(dl_dir, 'music.mp3') 197 shutil.copyfile(loop_file, self._added_loop_file) 198 shutil.copyfile(music_file, self._added_music_file) 199 uid = pwd.getpwnam('chronos').pw_uid 200 gid = pwd.getpwnam('chronos').pw_gid 201 os.chmod(self._added_loop_file, 0755) 202 os.chmod(self._added_music_file, 0755) 203 os.chown(self._added_loop_file, uid, gid) 204 os.chown(self._added_music_file, uid, gid) 205 206 # Open Test Dialog tab, Settings tab, and Audio file 207 self._settings_tab = self._browser.tabs.New() 208 self._settings_tab.Navigate('chrome://settings/search#Bluetooth') 209 music_tab = self._browser.tabs.New() 210 music_tab.Navigate('file:///home/chronos/user/Downloads/loop.html') 211 212 def ask_user(self, message): 213 """Ask the user a yes or no question in an open tab. 214 215 Reset dialog page to be a question (message param) with 'PASS' and 216 'FAIL' buttons. Wait for answer. If no, ask for more information. 217 218 @param message: string sent to the user via browswer interaction. 219 220 """ 221 logging.info('Asking user "%s"', message) 222 sandbox = 'SANDBOX:<input type="text"/>' 223 html = '<h3>%s</h3>%s' % (message, sandbox) 224 self.set_tab_with_buttons(html, buttons=['PASS', 'FAIL']) 225 226 # Intepret results. 227 result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME) 228 if result == 1: 229 # Ask for more information on error. 230 html='<h3>Please provide more info:</h3>' 231 self.set_tab_with_textbox(html) 232 233 # Get explanation of error, clear output, and raise error. 234 result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME) 235 self.clear_output() 236 self._err('Testing %s. "%s".' % (self._test_type, result)) 237 elif result != 0: 238 raise error.TestError('Bad dialog value: %s' % result) 239 logging.info('Answer was PASS') 240 241 # Clear user screen. 242 self.clear_output() 243 244 def tell_user(self, message): 245 """Tell the user the given message in an open tab. 246 247 @param message: the text string to be displayed. 248 249 """ 250 logging.info('Telling user "%s"', message) 251 html = '<h3>%s</h3>' % message 252 self.set_tab(html) 253 254 def check_working(self, message=None): 255 """Steps to check that all devices are functioning. 256 257 Ask user to connect all devices, verify connections, and ask for 258 user input if they are working. 259 260 @param message: string of text the user is asked. Defaults to asking 261 the user to connect all devices. 262 263 """ 264 if not message: 265 message = ('Please connect all devices.<br>(You may need to ' 266 'click mice, press keyboard keys, or use the ' 267 'Connect button in Settings.)') 268 self.tell_user(message) 269 self.poll_adapter_power(True) 270 self.poll_connections(paired_status=True, connected_status=True) 271 self.ask_user('Are all Bluetooth devices working?<br>' 272 'Is audio playing only through Bluetooth devices?<br>' 273 'Do onboard keyboard and trackpad work?') 274 275 def ask_not_working(self): 276 """Ask the user pre-defined message about NOT working.""" 277 self.ask_user('No Bluetooth devices work.<br>Audio is NOT playing ' 278 'through onboard speakers or wired headphones.') 279 280 def start_dump(self, message=''): 281 """Run btmon in subprocess. 282 283 Kill previous btmon (if needed) and start new one using current 284 test type as base filename. Dumps stored in results folder. 285 286 @param message: string of text added to top of log entry. 287 288 """ 289 if hasattr(self, '_dump') and self._dump: 290 self._dump.kill() 291 if not hasattr(self, '_test_type'): 292 self._test_type = 'test' 293 logging.info('Starting btmon') 294 filename = '%s_btmon' % self._test_type 295 path = os.path.join(self.resultsdir, filename) 296 with open(path, 'a') as f: 297 f.write('%s\n' % _SECTION_BREAK) 298 f.write('%s: Starting btmon\n' % time.strftime(_TIME_FORMAT)) 299 f.write('%s\n' % message) 300 f.flush() 301 btmon_path = '/usr/bin/btmon' 302 try: 303 self._dump = subprocess.Popen([btmon_path], stdout=f, 304 stderr=subprocess.PIPE) 305 except Exception as e: 306 raise error.TestError('btmon: %s' % e) 307 308 def collect_logs(self, message=''): 309 """Store results of dbus GetManagedObjects and hciconfig. 310 311 Use current test type as base filename. Stored in results folder. 312 313 @param message: string of text added to top of log entry. 314 315 """ 316 logging.info('Collecting dbus info') 317 if not hasattr(self, '_test_type'): 318 self._test_type = 'test' 319 filename = '%s_dbus' % self._test_type 320 path = os.path.join(self.resultsdir, filename) 321 with open(path, 'a') as f: 322 f.write('%s\n' % _SECTION_BREAK) 323 f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message)) 324 f.write(json.dumps(self._get_objects().items(), indent=2)) 325 f.write('\n') 326 327 logging.info('Collecting hciconfig info') 328 filename = '%s_hciconfig' % self._test_type 329 path = os.path.join(self.resultsdir, filename) 330 with open(path, 'a') as f: 331 f.write('%s\n' % _SECTION_BREAK) 332 f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message)) 333 f.flush() 334 hciconfig_path = '/usr/bin/hciconfig' 335 try: 336 subprocess.check_call([hciconfig_path, '-a'], stdout=f) 337 except Exception as e: 338 raise error.TestError('hciconfig: %s' % e) 339 340 def os_idle_time_set(self, reset=False): 341 """Function to set short idle time or to reset to normal. 342 343 Not using sys_power so that user can use Bluetooth to wake machine. 344 345 @param reset: true to reset to normal idle time, false for short. 346 347 """ 348 powerd_path = '/usr/bin/set_short_powerd_timeouts' 349 flag = '--reset' if reset else '' 350 try: 351 subprocess.check_call([powerd_path, flag]) 352 except Exception as e: 353 raise error.TestError('idle cmd: %s' % e) 354 355 def os_suspend(self): 356 """Function to suspend ChromeOS using sys_power.""" 357 sys_power.do_suspend(5) 358 359 # Sleep 360 time.sleep(5) 361 362 def initialize(self): 363 self._bus = dbus.SystemBus() 364 365 def warmup(self, addrs='', test_phase='client', close_browser=True): 366 """Warmup setting paramters for semi-automated Bluetooth Test. 367 368 Actual test steps are implemened in run_once() function. 369 370 @param: addrs: list of MAC address of Bluetooth devices under test. 371 @param: test_phase: for use by server side tests to, for example, call 372 the same test before and after a reboot. 373 @param: close_browser: True if client side test should close browser 374 at end of test. 375 376 """ 377 self.login_and_open_browser() 378 379 self._addrs = addrs 380 self._test_type = 'start' 381 self._test_phase = test_phase 382 self._will_close_browser = close_browser 383 384 def cleanup(self): 385 """Cleanup of various files/processes opened during test. 386 387 Closes running btmon, closes browser (if asked to at start), and 388 deletes files added during test. 389 390 """ 391 if hasattr(self, '_dump'): 392 self._dump.kill() 393 if hasattr(self, '_will_close_browser') and self._will_close_browser: 394 self.close_browser() 395 if (hasattr(self, '_added_loop_file') 396 and os.path.exists(self._added_loop_file)): 397 os.remove(self._added_loop_file) 398 if (hasattr(self, '_added_music_file') 399 and os.path.exists(self._added_music_file)): 400 os.remove(self._added_music_file) 401